osd的多副本读写流程

数据结构

struct pg_pool_t{  

  enum {

    TYPE_REPLICATED = 1,     // 多副本

    //TYPE_RAID4 = 2,   // raid4 (never implemented)

    TYPE_ERASURE = 3,      // EC

  };

  uint64_t flags;            //定义了多种flags,例如FLAG_FULL

  __u8 type;               //多副本或EC

  __u8 size, min_size;      //副本数和至少保证的副本数

  __u8 crush_ruleset;       ///< ruleset的id

  __u8 object_hash;         //映射hash算法

private:

  __u32 pg_num, pgp_num;    ///< number of pgs

};   

class MOSDOp : public Message {

  static const int HEAD_VERSION = 5;

  static const int COMPAT_VERSION = 3;

private:

  uint32_t client_inc;   //client id

  __u32 osdmap_epoch; //版本号

  __u32 flags;   //客户端请求带的flags,例如CEPH_OSD_FLAG_ACK

  utime_t mtime;  //客户端请求所带的mtime

  object_t oid;  //操作的对象

  object_locator_t oloc; //对象的位置信息?

  pg_t pgid;  //对象所在的pgid

public:

  vector<OSDOp> ops; //多个操作集合

……

}  

struct OSDOp {

  ceph_osd_op op;

  sobject_t soid;

  bufferlist indata,outdata;

  int32_t rval;

};

struct ObjectState {

  object_info_t oi;

  bool exists;  

};

struct ObjectContext {

  ObjectState obs;//对象当前的状态

  SnapSetContext *ssc;//快照上下文信息

  Context *destructor_callback;//对象析构的时候快照相关资源释放

private:

  Mutex lock;//内部使用锁

Public:

  Cond cond;//内部控制读写等待,应该是private才对

  int unstable_writes, readers, writers_waiting, readers_waiting;//正在写,等待写,正在读,等待读

  Bool blocked;//copy操作的时候才会用到

  ObjectContextRef blocked_by;//阻塞了本对象写操作的对象

  set<ObjectContextRef> blocking;//本对象阻塞了其他对象写操作的集合

  map<pair<uint64_t, entity_name_t>, WatchRef> watchers;//该对象设置的watcher的集合

  struct RWState {

    enum State {

      RWNONE,

      RWREAD,

      RWWRITE,

      RWEXCL,

    };

State state;//上面3种状态

uint64_t count;//读请求或写请求的次数

list<OpRequestRef> waiters;//因为获取不到锁被挂起的请求

bool recovery_read_marker;//设置了这个状态后,不允许获取写锁

  } rwstate;

};

struct OpContext {

  OpRequestRef op;//请求信息

  osd_reqid_t reqid;

  vector<OSDOp> &ops;//本次req中的ops

  const ObjectState *obs;//老的objectcontext的obs

  const SnapSet *snapset;

  ObjectState new_obs;//最终的对象状态

  SnapSet new_snapset;

  bool modify;//一般为false,ec或UNDIRTY相关操作会为true

  bool user_modify;//默认false,写操作为true

  uint64_t bytes_written, bytes_read;//本次操作读和写的字节数

  utime_t mtime;

  SnapContext snapc;           // writer snap context

  eversion_t at_version;       // pg当前版本和osdmap当前版本

  version_t user_at_version;   // pg当前的user操作版本

  ......

};

概要分析

1.请求通过网络层pipe的reader线程接收到client请求,将请求发送到osd的op_wq队列

2.Osd的op_tp线程池中线程会从op_wq队列中获取请求,并进一步解析请求,如果是读请求,则直接在该线程获取数据,并返回给client。如果是写请求,则根据请求的op打包事务,发送到日志队列writeq。

3.日志线程会从writeq中获取写日志请求,完成后交给日志的finish线程处理。

4.日志的finish线程会将请求丢给filestore的op_wq队列。

5.Filestore的op_tp线程池将数据落盘

6.再通过filestore的finish线程将请求返回,进行回调处理

关键函数

OSD::handle_op

使用场景:1.处理客户端的op请求,在pipe的reader线程中被调用

细节分析:1.message的检查,包括和客户端的连接状态、文件名是否过长、黑名单、写场景下osd是否full状态、单次写入数据量是否过大。

2.OSDService::should_share_map判断是否要通知客户端更新osdmap。

3.根据请求的pgid获取pg信息,并调用OSD::enqueue_op进入osd的op_wq队列

ReplicatedPG::do_op

使用场景:1.处理客户端的op请求,在osd的op_tp线程池中被调用

细节分析:1.对应pg存在于各种非active,当前操作需要放到不同的队列中等待。

2.对于dup操作,通过reqid来查询请求,判断当前请求是否是dup操作。如果是dup操作,并且已经完成,则直接返回成功。

3.根据oid获取objectcontext,从pg缓存中获取objectcontext,如果缓存中不存在,则重新创建一个,保存在缓存中。

4.根据请求op,objectcontext创建opcontext。

5.调用get_rw_locks加锁,该锁主要保证操作在多副本间的一致性。在remove_repop函数的release_op_ctx_locks函数中释放。该锁是不阻塞线程的,请求会被暂存在objectcontext中,后续会重新处理。

6.调用execute_ctx。

ReplicatedPG::execute_ctx

使用场景:1.处理客户端的op请求,在osd的op_tp线程池中被调用

细节分析:1.PgLog版本+1,并保存在opcontext,标记为当前op的pgLog版本号,以及记录请求的mtime。由于整个线程的逻辑处理都在pg_lock中进行,所以保证了整个版本号递增的原子性。

2.如果是读操作会调用ondisk_read_lock获取读锁。该锁保证了对象在主osd节点操作的原子性,由于write请求是先写journal再异步写disk,所以这里如果该对象还未异步刷到disk是不会释放写锁,则本次读会被挂住。该锁是阻塞线程的。

3.调用prepare_transaction。该函数用于处理各种op操作,写操作则打包成事务,读操作则直接从磁盘上读取,创建写、删等操作的pg_log_entry。

4.创建返回给客户端的消息实例MOSDOpReply。并设置返回值。

5.调用calc_trim_to计算需要trim的版本号。min_last_complete_ondisk表示同步到所有osd的pglog的版本号,因此这个版本之后的pglog都是不能够被trim的。pg_log实例中的index_log(pg_log_entry_t),保留了未trim的pg_log_entry_t的信息,并且所有的entry都是有序存放在index_log中。Pg_log只保存osd_min_pg_log_entries多日志,当index_log中size大于这个值时,需要开始trim,并计算出需要trim的条数num_to_trim,通过遍历index_log,trim掉num_to_trim即可,当然如果其中某一个pg_log_entry_t大于min_last_complete_ondisk则不能trim了。这个时候pg的entry可能大于osd_min_pg_log_entries。

6.调用issue_repop。负责将事务发送给各个副本,同时将事务发送给日志处理线程。

7.调用eval_repop。该函数主要负责all_commit(所有副本和本地存储日志完成返回)和all_applied(所有副本和本地存储磁盘返回)的处理,将请求返回给客户端,在这里调用感觉意义不大。

ReplicatedPG::issue_repop

使用场景:1.处理客户端的op请求,在osd的op_tp线程池中被调用

细节分析:1.更新peer_info中各个shard的pginfo的版本信息。

2.对objectcontext加ondisk_lock。对于写操作必须要保证数据写到disk的pagecache中才会释放该锁。

3.创建3个回调。这里要先理解commit和applied的含义,commit是指写完日志返回的情况,applied是指写完disk的pagecache返回的场景。而slave osd的完成写请求返回,在主osd的处理是会先调用commit再调用applied,所以对于副本osd返回的请求其实是applied和commit同时完成。on_all_commit表示所有副本的commit操作都完成了后使用的回调,on_all_applied表示所有副本的applied都完成后使用的回调。onapplied_sync表示本地存储某对象写到disk的pagecache后使用的回调。

4.调用submit_transaction函数

ReplicatedBackend::submit_transaction

使用场景:1.处理客户端的op请求,在osd的op_tp线程池中被调用

细节分析:1.创建InProgressOp实例,并插入到in_progress_ops缓存中。该实例主要用于控制该op的多副本之间的完成情况。waiting_for_commit和waiting_for_applied的set集合初始状态为所有osd对应的pg_shard_t值,当有副本返回则对应的pg_shard_t则会从set集合中删除。当set为空,则调用对应的回调。

2.issue_op负责将数据打包到MSG_OSD_REPOP消息请求中,发送到各个副本上。

3.log_operation负责把本次操作的pglog_entry和pg_info都保存在事务中。计算并保存要trim的pglog_entry到PGLog的trimmed中去。同时把要删除的pglog也打包到事务中去

4.注册3个回调context到op_t。其中C_OSD_OnOpApplied和C_DeleteTransaction是在filestore层数据写入到pagecache中后,通过filestore中的finish线程调用。C_OSD_OnOpApplied用于调用C_OSD_RepopApplied->eval_repop,在eval_repop 的all_applied场景释放相关资源。

总结

从整个osd的数据读写流程来思考数据的并发性和顺序性。可以参考

http://www.sysnote.org/2016/08/29/ceph-io-sequence/

场景一:不同pg的对象操作。相互独立的pg不存在顺序性,且整个流程上看都是可以并发操作的。

场景二:同一个pg的不同对象操作。由于在osd的op_tp线程池处理过程会加pg锁,所以在进入journal队列前是串行的,但这一段逻辑没有io操作,锁的时延并不高。在filestore层会加一个osr->apply_lock.Lock(),这个osr也是一个pg一个实例,所以在filestore层对象读写操作也是串行的,由于写操作都要对应更新pglog_entry,估计是为了保证pglog的一致性。不过好在写完journal,请求已经返回client。

场景三:同一个对象的读写操作。由于在osd的op_tp线程池处理会对objectcontext加disk_lock,所以如果写操作没有在filestore层完成pagecache的写入,是不会释放写锁,从而读操作会阻塞在加读锁上面。

场景四:同一个对象的写写操作。其实和场景二是一样的,filestore层和osd层都有类似的pg锁,来保证同一个pg内的操作是串行化的,在journal层本身就是串行化。

猜你喜欢

转载自my.oschina.net/u/2257799/blog/906485