系统卡写入量过大之---ceph触发compaction机制

前言:

终于走到最后一步了,有一种强烈的感觉,只要抑制住了mon主动触发的compaction,系统卡写入量一定会降下来。接下来开始撸代码。下面详细说一下:

思前想后,觉得如果在源码里找到mon下发compaction的地方,把compaction停掉,看一下系统卡写入量会不会明显降低,带着这个目的,在/src/mon/MonitorDBStore.h中找到了如下函数:

int apply_transaction(MonitorDBStore::TransactionRef t) {

  //获取当前事物指针
    KeyValueDB::Transaction dbt = db->get_transaction();

    if (do_dump) {
      if (!g_conf->mon_debug_dump_json) {
        bufferlist bl;
        t->encode(bl);
        bl.write_fd(dump_fd_binary);
      } else {
        t->dump(&dump_fmt, true);
        dump_fmt.flush(dump_fd_json);
        dump_fd_json.flush();
      }
    }

	//创建一个compact链表,名称为compact
    list<pair<string, pair<string,string> > > compact;
	//获取对端发过来的事物类型,执行相关命令
    for (list<Op>::const_iterator it = t->ops.begin();
	 it != t->ops.end();
	 ++it) {
      const Op& op = *it;
      switch (op.type) {
      case Transaction::OP_PUT:   
	dbt->set(op.prefix, op.key, op.bl);
	break;
      case Transaction::OP_ERASE:
	dbt->rmkey(op.prefix, op.key);
	break;
      case Transaction::OP_COMPACT://如果op类型为COMPACT则,则将op携带的"prefix":"key:endkey",放到compact列表后面
      							    //这里的endkey是最早的一个key
	  	lsubdout(g_ceph_context, mon, 0)<< "<wd_debug2>:7" << dendl;
		this->print_stacktrace();
	compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
	break;
      default:
	derr << __func__ << " unknown op type " << op.type << dendl;
	ceph_abort();
	break;
      }
    }
	//这里进行写数据库,先写在内存里,如果写成功则返回0
    int r = db->submit_transaction_sync(dbt);
    if (r >= 0) {
		//如果compact队列不为空,则进行compact
      while (!compact.empty()) {
	  	lsubdout(g_ceph_context, mon, 0)<< "<wd_debug2>:8" << dendl;
	if (compact.front().second.first == string() &&
	    compact.front().second.second == string()){
	  db->compact_prefix_async(compact.front().first);
		lsubdout(g_ceph_context, mon, 0)<< "<wd_debug2>:9" << dendl;
		}
	else{
	  db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
		lsubdout(g_ceph_context, mon, 0)<< "<wd_debug2>:10" << dendl;
	}
	compact.pop_front();
      }
    } else {
      assert(0 == "failed to write to db");
    }
    return r;
  }

由上述代码可以出,如果op类型是OP_COMPACT,则会将ops携带的prefix以及key和endkey填充给compact链表,后面如果判断到compact链表不为空,则会执行db->compact_range_async这个函数。

void RocksDBStore::compact_range_async(const string& start, const string& end)
{
  Mutex::Locker l(compact_queue_lock);

  // try to merge adjacent ranges.  this is O(n), but the queue should
  // be short.  note that we do not cover all overlap cases and merge
  // opportunities here, but we capture the ones we currently need.
  //遍历 compact_queue 队列
  
  //获取一下 compact_queue 队列内容
  list< pair<string,string> >::iterator p = compact_queue.begin();
  dout(0) <<"<wd_debug90>:compact_quque_size:"<< compact_queue.size()<< dendl;
	dout(0) <<"<wd_debug90>:compact_quque_content:"<< "start: "<<start
	<<"end: "<< end << dendl;
  while (p != compact_queue.end()) {
  	dout(0) <<"<wd_debug90>:compact_quque_content:"<< "p->first: "<<p->first
		<<"p->second: "<< p->second<< dendl;
  	//如果compact_queue队列中的某个字段的头和尾 与  本次传入的头和尾一致,则直接退出
    if (p->first == start && p->second == end) {
      // dup; no-op
      dout(0) <<"<wd_debug2-4>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
      return;
    }
	//如果 compact_queue 的头在本次传入的 之间               start < p->first <= end,则将start和当前队列尾巴传入队列
	//其实就是发现 当前头要比            compact_queue 中的头要新,替换一下头。尾部还用 compact_queue中的尾
    if (p->first <= end && p->first > start) {
      // merge with existing range to the right
      compact_queue.push_back(make_pair(start, p->second));
      compact_queue.erase(p);
      logger->inc(l_rocksdb_compact_queue_merge);
	  dout(0) <<"<wd_debug2-3>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
      break;
    }
	//如果   end> p->second >=start ,则替换一下尾部,头还用最新的,插入compact_queue队列中
    if (p->second >= start && p->second < end) {
      // merge with existing range to the left
      compact_queue.push_back(make_pair(p->first, end));
      compact_queue.erase(p);
      logger->inc(l_rocksdb_compact_queue_merge);
	  dout(0) <<"<wd_debug2-2>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
      break;
    }
    ++p;
  }
  //最后把头和尾插进去
  if (p == compact_queue.end()) {
    // no merge, new entry.
    compact_queue.push_back(make_pair(start, end));
    logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
  }
  
  compact_queue_cond.Signal();
  dout(0) <<"<wd_debug2-1>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
  if (!compact_thread.is_started()) {
  	//if(compact_queue.size() > 10){
		dout(0) <<"<wd_debug2>:"<< "compact_queue_len:"<<l_rocksdb_compact_queue_len<< dendl;
    compact_thread.create("rstore_compact");
  //		}
  }
}

看代码可以知道,这里在更新compact_queue队列中的内容,把compact中指定prefix的start和end更新为最新状态。只要本次op携带的prefix不和当前一样,则会直接调用compact_thread.create(“rstore_compact”);。我于是直接把compact_thread.create(“rstore_compact”);给注释掉了,然后放到环境上去跑了一小时,果然系统卡写入量由之前的1.7M/S变为500k/s看来问题就出在这了,接着打出了该函数的调用栈:

/usr/bin/ceph-mon(MonitorDBStore::print_stacktrace()+0x36) [0x7f7ee2a17366]
/usr/bin/ceph-mon(MonitorDBStore::apply_transaction(std::shared_ptr<MonitorDBStore::Transaction>)+0x4aa) [0x7f7ee2a1b4fa]
/usr/bin/ceph-mon(Paxos::store_state(MMonPaxos*)+0x96e) [0x7f7ee2b64cee]
/usr/bin/ceph-mon(Paxos::handle_commit(boost::intrusive_ptr<MonOpRequest>)+0x323) [0x7f7ee2b65563]
/usr/bin/ceph-mon(Paxos::dispatch(boost::intrusive_ptr<MonOpRequest>)+0x375) [0x7f7ee2b6d095]
/usr/bin/ceph-mon(Monitor::dispatch_op(boost::intrusive_ptr<MonOpRequest>)+0xec7) [0x7f7ee2a59337]
/usr/bin/ceph-mon(Monitor::_ms_dispatch(Message*)+0x7eb) [0x7f7ee2a59fdb]
/usr/bin/ceph-mon(Monitor::ms_dispatch(Message*)+0x23) [0x7f7ee2a852d3]
/usr/bin/ceph-mon(DispatchQueue::entry()+0x792) [0x7f7ee2f2ee22]
/usr/bin/ceph-mon(DispatchQueue::DispatchThread::entry()+0xd) [0x7f7ee2d2c28d]
/lib64/libpthread.so.0(+0x7df3) [0x7f7ee1a2bdf3]
/lib64/libc.so.6(clone+0x6d) [0x7f7edefba3dd]

但是这里比较气人的是,从这个函数调用栈看下来COMPACT op一路绿灯,直接COMPACT,后面在compact_thread.create(“rstore_compact”);这里加了个if判断,发过来10次消息,只执行一次。这样修改后系统卡写入量虽然下来了,但是总觉得解决的很牵强,于是和主管商量再延期两天。想找一下到底是谁发给我的COMPACT op。想到这里恍然大悟,想起来,mon所有命令只有leader才会发送,各个从mon都只是接受命令的源码里找了一下代码,调用路径如下:
首先:/src/mon/Paxos.cc

struct C_Committed : public Context {
  Paxos *paxos;
  explicit C_Committed(Paxos *p) : paxos(p) {}
  void finish(int r) override {
    assert(r >= 0);
    Mutex::Locker l(paxos->mon->lock);
    if (paxos->is_shutdown()) {
      paxos->abort_commit();
      return;
    }
    paxos->commit_finish();
  }
};

调用commit_finish();

void Paxos::commit_finish()
{
  dout(20) << __func__ << " " << (last_committed+1) << dendl;
  utime_t end = ceph_clock_now();
  logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);

  assert(g_conf->paxos_kill_at != 8);

  // cancel lease - it was for the old value.
  //  (this would only happen if message layer lost the 'begin', but
  //   leader still got a majority and committed with out us.)
  lease_expire = utime_t();  // cancel lease

  last_committed++;
  last_commit_time = ceph_clock_now();

  // refresh first_committed; this txn may have trimmed.
  first_committed = get_store()->get(get_name(), "first_committed");

  _sanity_check_store();

	//print_stacktrace();
  // tell everyone
  for (set<int>::const_iterator p = mon->get_quorum().begin();
       p != mon->get_quorum().end();
       ++p) {
    if (*p == mon->rank) continue;

    dout(10) << " sending commit to mon." << *p << dendl;
    MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
				      ceph_clock_now());
    commit->values[last_committed] = new_value;
    commit->pn = accepted_pn;
    commit->last_committed = last_committed;

    mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
  }

  assert(g_conf->paxos_kill_at != 9);

  // get ready for a new round.
  new_value.clear();

  // WRITING -> REFRESH
  // among other things, this lets do_refresh() -> mon->bootstrap() know
  // it doesn't need to flush the store queue
  assert(is_writing() || is_writing_previous());
  state = STATE_REFRESH;
  assert(commits_started > 0);
  --commits_started;

  if (do_refresh()) {
    commit_proposal();
    if (mon->get_quorum().size() > 1) {
      extend_lease();
    }

    finish_contexts(g_ceph_context, waiting_for_commit);

    assert(g_conf->paxos_kill_at != 10);

    finish_round();
  }
}

可以看到MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,ceph_clock_now());,搜索了整个代码,只有这一处生成了COMMIT OP,在程序的最后调用finish_round

void Paxos::finish_round()
{
  dout(10) << __func__ << dendl;
  assert(mon->is_leader());

  // ok, now go active!
  state = STATE_ACTIVE;

  dout(20) << __func__ << " waiting_for_acting" << dendl;
  finish_contexts(g_ceph_context, waiting_for_active);
  dout(20) << __func__ << " waiting_for_readable" << dendl;
  finish_contexts(g_ceph_context, waiting_for_readable);
  dout(20) << __func__ << " waiting_for_writeable" << dendl;
  finish_contexts(g_ceph_context, waiting_for_writeable);
  
  dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl;

  int ava = get_version() - get_first_committed();
  int ma = g_conf->paxos_min + g_conf->paxos_trim_min;
  
  dout(0) << "<wd_debug901: "<<"available_versions:"<<ava
	  <<" maximum_versions:"<<ma<< dendl;

  if (should_trim()) {
    trim();
  }


  if (is_active() && pending_proposal) {
    propose_pending();
  }
}

可以看到关键部分了,should_trim()和 trim();。这两个函数看进去,终于知道了哪些代码可以控制COMMIT OP的发送频率

  bool should_trim() {
    int available_versions = get_version() - get_first_committed();
    int maximum_versions = g_conf->paxos_min + g_conf->paxos_trim_min;

    if (trimming || (available_versions <= maximum_versions))
      return false;

    return true;
  }



void Paxos::trim()
{
  assert(should_trim());
  version_t end = MIN(get_version() - g_conf->paxos_min,
		      get_first_committed() + g_conf->paxos_trim_max);
 
  if (first_committed >= end)
    return;
  dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
  MonitorDBStore::TransactionRef t = get_pending_transaction();
  for (version_t v = first_committed; v < end; ++v) {
    dout(10) << "trim " << v << dendl;
    t->erase(get_name(), v);
  }
  t->put(get_name(), "first_committed", end);
  if (g_conf->mon_compact_on_trim) {
    dout(0) << "<wd_debug2>: "<< " compacting trimmed range" << dendl;
    t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
  }
  trimming = true;
  queue_pending_finisher(new C_Trimmed(this));
}

欣喜若狂发现四个参数:
mon_compact_on_trim
paxos_min
paxos_trim_max
paxos_trim_min
由上面代码可以分析得到不发生compact发生在如下三种情况下:
1、当mon_compact_on_trim为false时,mon直接不触发compact,compact全权由rocksdb自身机制触发
2、first_committed >= get_version() - paxos_min时,翻译一下就是如果当前版本增长量没有超过paxos_min
3、first_committed >= get_first_committed() + paxos_trim_max);翻译一下就是第一次committed值加上paxos_trim_max仍然等于first_committed时
4、get_version() - get_first_committed() <= paxos_min + paxos_trim_min时,即当前版本差异小于paxos_min+paxos_trim_min之和时
分析完毕后,立即将该四个值分别扩大十倍,然后代码里加了打印,看是不是到了触发条件就会触发rocksdb compact。结果从打印发现,还没到触发条件时compact又执行了,想到环境上可能还有地方在触发compact。看了下代码,发现果然:
/src/mon/PaxosService.cc

void PaxosService::maybe_trim()
{
  if (!is_writeable())
    return;

  version_t trim_to = get_trim_to();
  if (trim_to < get_first_committed())
    return;

  version_t to_remove = trim_to - get_first_committed();
  
   dout(0) << " wd_debug902: " <<"to_remove:"<<to_remove
   	<<" paxos_service_trim_min:"<<(version_t)g_conf->paxos_service_trim_min<< dendl;
   
  if (g_conf->paxos_service_trim_min > 0 &&
      to_remove < (version_t)g_conf->paxos_service_trim_min) {
    dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
	     << " < paxos_service_trim_min " << g_conf->paxos_service_trim_min << dendl;
    return;
  }

  if (g_conf->paxos_service_trim_max > 0 &&
      to_remove > (version_t)g_conf->paxos_service_trim_max) {
    dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
	     << " > paxos_service_trim_max, limiting to " << g_conf->paxos_service_trim_max
	     << dendl;
    trim_to = get_first_committed() + g_conf->paxos_service_trim_max;
    to_remove = g_conf->paxos_service_trim_max;
  }

  dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
  MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
  trim(t, get_first_committed(), trim_to);
  put_first_committed(t, trim_to);

  // let the service add any extra stuff
  encode_trim_extra(t, trim_to);

  paxos->trigger_propose();
}

这里会调用trim(t, get_first_committed(), trim_to);,好在这里有有参数可以控制
to_remove < paxos_service_trim_min时不发生compact,后面如果to_remove大于paxos_service_trim_min,则每次最多trim paxos_service_trim_max。看到这立马将这两个值也扩大十倍。这个时候已经晚上十一点半了,参数设置好后回去休息了
第二天过来一看,写入量六小时平均700K/S,大功告成。

总结一下:

mon不发生trim时的五种情况:
1、当mon_compact_on_trim为false时,mon直接不触发compact,compact全权由rocksdb自身机制触发
2、first_committed >= get_version() - paxos_min时,翻译一下就是如果当前版本增长量没有超过paxos_min
3、first_committed >= get_first_committed() + paxos_trim_max);翻译一下就是第一次committed值加上paxos_trim_max仍然等于first_committed时
4、get_version() - get_first_committed() <= paxos_min + paxos_trim_min时,即当前版本差异小于paxos_min+paxos_trim_min之和时
5、to_remove < paxos_service_trim_min时

发布了297 篇原创文章 · 获赞 6 · 访问量 8568

猜你喜欢

转载自blog.csdn.net/qq_23929673/article/details/96155821