前言:
终于走到最后一步了,有一种强烈的感觉,只要抑制住了mon主动触发的compaction,系统卡写入量一定会降下来。接下来开始撸代码。下面详细说一下:
思前想后,觉得如果在源码里找到mon下发compaction的地方,把compaction停掉,看一下系统卡写入量会不会明显降低,带着这个目的,在/src/mon/MonitorDBStore.h中找到了如下函数:
int apply_transaction(MonitorDBStore::TransactionRef t) {
//获取当前事物指针
KeyValueDB::Transaction dbt = db->get_transaction();
if (do_dump) {
if (!g_conf->mon_debug_dump_json) {
bufferlist bl;
t->encode(bl);
bl.write_fd(dump_fd_binary);
} else {
t->dump(&dump_fmt, true);
dump_fmt.flush(dump_fd_json);
dump_fd_json.flush();
}
}
//创建一个compact链表,名称为compact
list<pair<string, pair<string,string> > > compact;
//获取对端发过来的事物类型,执行相关命令
for (list<Op>::const_iterator it = t->ops.begin();
it != t->ops.end();
++it) {
const Op& op = *it;
switch (op.type) {
case Transaction::OP_PUT:
dbt->set(op.prefix, op.key, op.bl);
break;
case Transaction::OP_ERASE:
dbt->rmkey(op.prefix, op.key);
break;
case Transaction::OP_COMPACT://如果op类型为COMPACT则,则将op携带的"prefix":"key:endkey",放到compact列表后面
//这里的endkey是最早的一个key
lsubdout(g_ceph_context, mon, 0)<< "<wd_debug2>:7" << dendl;
this->print_stacktrace();
compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
break;
default:
derr << __func__ << " unknown op type " << op.type << dendl;
ceph_abort();
break;
}
}
//这里进行写数据库,先写在内存里,如果写成功则返回0
int r = db->submit_transaction_sync(dbt);
if (r >= 0) {
//如果compact队列不为空,则进行compact
while (!compact.empty()) {
lsubdout(g_ceph_context, mon, 0)<< "<wd_debug2>:8" << dendl;
if (compact.front().second.first == string() &&
compact.front().second.second == string()){
db->compact_prefix_async(compact.front().first);
lsubdout(g_ceph_context, mon, 0)<< "<wd_debug2>:9" << dendl;
}
else{
db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
lsubdout(g_ceph_context, mon, 0)<< "<wd_debug2>:10" << dendl;
}
compact.pop_front();
}
} else {
assert(0 == "failed to write to db");
}
return r;
}
由上述代码可以出,如果op类型是OP_COMPACT,则会将ops携带的prefix以及key和endkey填充给compact链表,后面如果判断到compact链表不为空,则会执行db->compact_range_async这个函数。
void RocksDBStore::compact_range_async(const string& start, const string& end)
{
Mutex::Locker l(compact_queue_lock);
// try to merge adjacent ranges. this is O(n), but the queue should
// be short. note that we do not cover all overlap cases and merge
// opportunities here, but we capture the ones we currently need.
//遍历 compact_queue 队列
//获取一下 compact_queue 队列内容
list< pair<string,string> >::iterator p = compact_queue.begin();
dout(0) <<"<wd_debug90>:compact_quque_size:"<< compact_queue.size()<< dendl;
dout(0) <<"<wd_debug90>:compact_quque_content:"<< "start: "<<start
<<"end: "<< end << dendl;
while (p != compact_queue.end()) {
dout(0) <<"<wd_debug90>:compact_quque_content:"<< "p->first: "<<p->first
<<"p->second: "<< p->second<< dendl;
//如果compact_queue队列中的某个字段的头和尾 与 本次传入的头和尾一致,则直接退出
if (p->first == start && p->second == end) {
// dup; no-op
dout(0) <<"<wd_debug2-4>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
return;
}
//如果 compact_queue 的头在本次传入的 之间 start < p->first <= end,则将start和当前队列尾巴传入队列
//其实就是发现 当前头要比 compact_queue 中的头要新,替换一下头。尾部还用 compact_queue中的尾
if (p->first <= end && p->first > start) {
// merge with existing range to the right
compact_queue.push_back(make_pair(start, p->second));
compact_queue.erase(p);
logger->inc(l_rocksdb_compact_queue_merge);
dout(0) <<"<wd_debug2-3>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
break;
}
//如果 end> p->second >=start ,则替换一下尾部,头还用最新的,插入compact_queue队列中
if (p->second >= start && p->second < end) {
// merge with existing range to the left
compact_queue.push_back(make_pair(p->first, end));
compact_queue.erase(p);
logger->inc(l_rocksdb_compact_queue_merge);
dout(0) <<"<wd_debug2-2>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
break;
}
++p;
}
//最后把头和尾插进去
if (p == compact_queue.end()) {
// no merge, new entry.
compact_queue.push_back(make_pair(start, end));
logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
}
compact_queue_cond.Signal();
dout(0) <<"<wd_debug2-1>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
if (!compact_thread.is_started()) {
//if(compact_queue.size() > 10){
dout(0) <<"<wd_debug2>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
compact_thread.create("rstore_compact");
// }
}
}
看代码可以知道,这里在更新compact_queue队列中的内容,把compact中指定prefix的start和end更新为最新状态。只要本次op携带的prefix不和当前一样,则会直接调用compact_thread.create(“rstore_compact”);。我于是直接把compact_thread.create(“rstore_compact”);给注释掉了,然后放到环境上去跑了一小时,果然系统卡写入量由之前的1.7M/S变为500k/s看来问题就出在这了,接着打出了该函数的调用栈:
/usr/bin/ceph-mon(MonitorDBStore::print_stacktrace()+0x36) [0x7f7ee2a17366]
/usr/bin/ceph-mon(MonitorDBStore::apply_transaction(std::shared_ptr<MonitorDBStore::Transaction>)+0x4aa) [0x7f7ee2a1b4fa]
/usr/bin/ceph-mon(Paxos::store_state(MMonPaxos*)+0x96e) [0x7f7ee2b64cee]
/usr/bin/ceph-mon(Paxos::handle_commit(boost::intrusive_ptr<MonOpRequest>)+0x323) [0x7f7ee2b65563]
/usr/bin/ceph-mon(Paxos::dispatch(boost::intrusive_ptr<MonOpRequest>)+0x375) [0x7f7ee2b6d095]
/usr/bin/ceph-mon(Monitor::dispatch_op(boost::intrusive_ptr<MonOpRequest>)+0xec7) [0x7f7ee2a59337]
/usr/bin/ceph-mon(Monitor::_ms_dispatch(Message*)+0x7eb) [0x7f7ee2a59fdb]
/usr/bin/ceph-mon(Monitor::ms_dispatch(Message*)+0x23) [0x7f7ee2a852d3]
/usr/bin/ceph-mon(DispatchQueue::entry()+0x792) [0x7f7ee2f2ee22]
/usr/bin/ceph-mon(DispatchQueue::DispatchThread::entry()+0xd) [0x7f7ee2d2c28d]
/lib64/libpthread.so.0(+0x7df3) [0x7f7ee1a2bdf3]
/lib64/libc.so.6(clone+0x6d) [0x7f7edefba3dd]
但是这里比较气人的是,从这个函数调用栈看下来COMPACT op一路绿灯,直接COMPACT,后面在compact_thread.create(“rstore_compact”);这里加了个if判断,发过来10次消息,只执行一次。这样修改后系统卡写入量虽然下来了,但是总觉得解决的很牵强,于是和主管商量再延期两天。想找一下到底是谁发给我的COMPACT op。想到这里恍然大悟,想起来,mon所有命令只有leader才会发送,各个从mon都只是接受命令的源码里找了一下代码,调用路径如下:
首先:/src/mon/Paxos.cc
struct C_Committed : public Context {
Paxos *paxos;
explicit C_Committed(Paxos *p) : paxos(p) {}
void finish(int r) override {
assert(r >= 0);
Mutex::Locker l(paxos->mon->lock);
if (paxos->is_shutdown()) {
paxos->abort_commit();
return;
}
paxos->commit_finish();
}
};
调用commit_finish();
void Paxos::commit_finish()
{
dout(20) << __func__ << " " << (last_committed+1) << dendl;
utime_t end = ceph_clock_now();
logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);
assert(g_conf->paxos_kill_at != 8);
// cancel lease - it was for the old value.
// (this would only happen if message layer lost the 'begin', but
// leader still got a majority and committed with out us.)
lease_expire = utime_t(); // cancel lease
last_committed++;
last_commit_time = ceph_clock_now();
// refresh first_committed; this txn may have trimmed.
first_committed = get_store()->get(get_name(), "first_committed");
_sanity_check_store();
//print_stacktrace();
// tell everyone
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == mon->rank) continue;
dout(10) << " sending commit to mon." << *p << dendl;
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
ceph_clock_now());
commit->values[last_committed] = new_value;
commit->pn = accepted_pn;
commit->last_committed = last_committed;
mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
}
assert(g_conf->paxos_kill_at != 9);
// get ready for a new round.
new_value.clear();
// WRITING -> REFRESH
// among other things, this lets do_refresh() -> mon->bootstrap() know
// it doesn't need to flush the store queue
assert(is_writing() || is_writing_previous());
state = STATE_REFRESH;
assert(commits_started > 0);
--commits_started;
if (do_refresh()) {
commit_proposal();
if (mon->get_quorum().size() > 1) {
extend_lease();
}
finish_contexts(g_ceph_context, waiting_for_commit);
assert(g_conf->paxos_kill_at != 10);
finish_round();
}
}
可以看到MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,ceph_clock_now());,搜索了整个代码,只有这一处生成了COMMIT OP,在程序的最后调用finish_round
void Paxos::finish_round()
{
dout(10) << __func__ << dendl;
assert(mon->is_leader());
// ok, now go active!
state = STATE_ACTIVE;
dout(20) << __func__ << " waiting_for_acting" << dendl;
finish_contexts(g_ceph_context, waiting_for_active);
dout(20) << __func__ << " waiting_for_readable" << dendl;
finish_contexts(g_ceph_context, waiting_for_readable);
dout(20) << __func__ << " waiting_for_writeable" << dendl;
finish_contexts(g_ceph_context, waiting_for_writeable);
dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl;
int ava = get_version() - get_first_committed();
int ma = g_conf->paxos_min + g_conf->paxos_trim_min;
dout(0) << "<wd_debug901: "<<"available_versions:"<<ava
<<" maximum_versions:"<<ma<< dendl;
if (should_trim()) {
trim();
}
if (is_active() && pending_proposal) {
propose_pending();
}
}
可以看到关键部分了,should_trim()和 trim();。这两个函数看进去,终于知道了哪些代码可以控制COMMIT OP的发送频率
bool should_trim() {
int available_versions = get_version() - get_first_committed();
int maximum_versions = g_conf->paxos_min + g_conf->paxos_trim_min;
if (trimming || (available_versions <= maximum_versions))
return false;
return true;
}
void Paxos::trim()
{
assert(should_trim());
version_t end = MIN(get_version() - g_conf->paxos_min,
get_first_committed() + g_conf->paxos_trim_max);
if (first_committed >= end)
return;
dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
MonitorDBStore::TransactionRef t = get_pending_transaction();
for (version_t v = first_committed; v < end; ++v) {
dout(10) << "trim " << v << dendl;
t->erase(get_name(), v);
}
t->put(get_name(), "first_committed", end);
if (g_conf->mon_compact_on_trim) {
dout(0) << "<wd_debug2>: "<< " compacting trimmed range" << dendl;
t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
}
trimming = true;
queue_pending_finisher(new C_Trimmed(this));
}
欣喜若狂发现四个参数:
mon_compact_on_trim
paxos_min
paxos_trim_max
paxos_trim_min
由上面代码可以分析得到不发生compact发生在如下三种情况下:
1、当mon_compact_on_trim为false时,mon直接不触发compact,compact全权由rocksdb自身机制触发
2、first_committed >= get_version() - paxos_min时,翻译一下就是如果当前版本增长量没有超过paxos_min
3、first_committed >= get_first_committed() + paxos_trim_max);翻译一下就是第一次committed值加上paxos_trim_max仍然等于first_committed时
4、get_version() - get_first_committed() <= paxos_min + paxos_trim_min时,即当前版本差异小于paxos_min+paxos_trim_min之和时
分析完毕后,立即将该四个值分别扩大十倍,然后代码里加了打印,看是不是到了触发条件就会触发rocksdb compact。结果从打印发现,还没到触发条件时compact又执行了,想到环境上可能还有地方在触发compact。看了下代码,发现果然:
/src/mon/PaxosService.cc
void PaxosService::maybe_trim()
{
if (!is_writeable())
return;
version_t trim_to = get_trim_to();
if (trim_to < get_first_committed())
return;
version_t to_remove = trim_to - get_first_committed();
dout(0) << " wd_debug902: " <<"to_remove:"<<to_remove
<<" paxos_service_trim_min:"<<(version_t)g_conf->paxos_service_trim_min<< dendl;
if (g_conf->paxos_service_trim_min > 0 &&
to_remove < (version_t)g_conf->paxos_service_trim_min) {
dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
<< " < paxos_service_trim_min " << g_conf->paxos_service_trim_min << dendl;
return;
}
if (g_conf->paxos_service_trim_max > 0 &&
to_remove > (version_t)g_conf->paxos_service_trim_max) {
dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
<< " > paxos_service_trim_max, limiting to " << g_conf->paxos_service_trim_max
<< dendl;
trim_to = get_first_committed() + g_conf->paxos_service_trim_max;
to_remove = g_conf->paxos_service_trim_max;
}
dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
trim(t, get_first_committed(), trim_to);
put_first_committed(t, trim_to);
// let the service add any extra stuff
encode_trim_extra(t, trim_to);
paxos->trigger_propose();
}
这里会调用trim(t, get_first_committed(), trim_to);,好在这里有有参数可以控制
to_remove < paxos_service_trim_min时不发生compact,后面如果to_remove大于paxos_service_trim_min,则每次最多trim paxos_service_trim_max。看到这立马将这两个值也扩大十倍。这个时候已经晚上十一点半了,参数设置好后回去休息了
第二天过来一看,写入量六小时平均700K/S,大功告成。
总结一下:
mon不发生trim时的五种情况:
1、当mon_compact_on_trim为false时,mon直接不触发compact,compact全权由rocksdb自身机制触发
2、first_committed >= get_version() - paxos_min时,翻译一下就是如果当前版本增长量没有超过paxos_min
3、first_committed >= get_first_committed() + paxos_trim_max);翻译一下就是第一次committed值加上paxos_trim_max仍然等于first_committed时
4、get_version() - get_first_committed() <= paxos_min + paxos_trim_min时,即当前版本差异小于paxos_min+paxos_trim_min之和时
5、to_remove < paxos_service_trim_min时