def loadQueues() { Journal.getQueueNamesFromFolder(path) map { queue(_) } } def queue(name: String): Option[PersistentQueue] = synchronized { if (shuttingDown) { None } else { Some(queues.get(name) getOrElse { // only happens when creating a queue for the first time. val q = if (name contains '+') { val master = name.split('+')(0) fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name log.info("Fanout queue %s added to %s", name, master) buildQueue(master, name, path.getPath) } else { buildQueue(name, name, path.getPath) } q.setup queues(name) = q q }) } } val config = queueConfigMap.getOrElse(name, defaultQueueConfig) log.info("Setting up queue %s: %s", realName, config) new PersistentQueue(realName, path, config, timer, journalSyncTimer, Some(this.apply))
上文中说到Kestrel启动的时候首先实例化一个对象,然后调用QueueCollection.loadQueues();函数,根据上面的代码我们可以看到,这个函数主要是从文件系统中找到所有的队列持久化文件,并调用queue函数。我们发现这个类为每个队列创建创建了一个PersistentQueue类,并调用PersistentQueue类的setup方法,这个方法主要是回放这个队列对应的持久化文件,具体的操作下节介绍。
另外该类中还有一些对队列的操作方法,如add,remove等操作方法。