karbor 是openstack中的数据保护项目。对于数据备份最重要的当然是备份,其次重要的就是可以设置定时和周期进行备份。karbor进行周期备份主要按以下步骤进行:
- 创建一个plan------指名要保护的对象和使用的provider
- 创建一个triger------一个时间相关的触发器,定义了从什么时候开始备份,以及备份的时间间隔
- 执行schedule operation------就是用上面的triger来执行定义好的plan
本文档就分析以下karbor 是如何实现定时备份的,主要从创建的triger是如何实现定时以及调度是如何使用triger来定时执行plan 的。
Schedule operation
在horizon中或者命令行中可以对指定的plan进行调度。调度时发出的http 请求如下面形式:
Create a scheduled operation, reequest body: {u'scheduled_operation': {u'operation_type': u'protect', u'name': u'test', u'trigger_id': u'854eda55-d39f-457e-8786-a47ef0b23a39', u'operation_definition': {u'provider_id': u'cf56bd3e-97a7-4078-b6d5-f36246333fd9', u'plan_id': u'31bdb8af-0b47-41e3-a762-5f1ac9f0ad62'}}}create from (pid=17244) /opt/stack/karbor/karbor/api/v1/scheduled_operations.py:89
从上面的内容可以看出请求发送到了/karbor/karbor/api/v1/scheduled_operations.py的中的create函数。从该函数一直往下追踪到/home/kele/Code/openstack/karbor/karbor/services/operationengine/manager.py:
OperationEngineManager::create_scheduled_operation,该函数的代码内容如下:
def create_scheduled_operation(self, context, operation): LOG.debug("Create scheduled operation.") self.operation_manager.check_operation_definition( operation.operation_type, operation.operation_definition, ) # 1.注册operation到trigger中 self.trigger_manager.register_operation(operation.trigger_id, operation.id) trust_id = self.user_trust_manager.add_operation( context, operation.id) #2. 创建 ScheduledOperationState 记录 state_info = { "operation_id": operation.id, "service_id": self._service_id, "trust_id": trust_id, "state": constants.OPERATION_STATE_REGISTERED } operation_state = objects.ScheduledOperationState( context, **state_info) try: operation_state.create() except Exception: self.trigger_manager.unregister_operation( operation.trigger_id, operation.id) raise
上面的函数主要完成两件事:
- 注册operation到trigger中,从http 请求的body中我们可以看到operation所包含的内容:trigger_id, plan_id, provider_id等。
- 创建ScheduledOperationState的记录
Trigger
在schedule_operation中将operation注册到trigger中,在trigger中完成了主要的工作。下面看一下trigger中时如何完成工作的。首先从上面的registe_operation开始,这个函数时trigger的一个方法,这里我们以time_trigger为例。TimeTrigger中该函数的内容如下:
def register_operation(self, operation_id, **kwargs): if operation_id in self._operation_ids: msg = (_("The operation_id(%s) is exist") % operation_id) raise exception.ScheduledOperationExist(msg) if self._greenthread and not self._greenthread.running: raise exception.TriggerIsInvalid(trigger_id=self._id) self._operation_ids.add(operation_id) #对于每一个trigger实例都会创建一个greenthread if self._greenthread is None: self._start_greenthread() def _start_greenthread(self): # Find the first time. # We don't known when using this trigger first time. timer = self._get_timer(self._trigger_property) first_run_time = self._compute_next_run_time( datetime.utcnow(), self._trigger_property['end_time'], timer) if not first_run_time: raise exception.TriggerIsInvalid(trigger_id=self._id) self._create_green_thread(first_run_time, timer) def _create_green_thread(self, first_run_time, timer): func = functools.partial( self._trigger_operations, trigger_property=self._trigger_property.copy(), timer=timer) self._greenthread = TriggerOperationGreenThread( first_run_time, func)
上面的代码最后调用了TriggerOperationGreenThread实例化了一个绿色线程,主要的逻辑都在里面完成:
class TriggerOperationGreenThread(object): def __init__(self, first_run_time, function): super(TriggerOperationGreenThread, self).__init__() self._is_sleeping = True self._pre_run_time = None self._running = False self._thread = None self._function = function self._start(first_run_time) def _start(self, first_run_time): self._running = True now = datetime.utcnow() #计算创建线程的时间 initial_delay = 0 if first_run_time <= now else ( int(timeutils.delta_seconds(now, first_run_time))) #调用eventlet的spawn_after在上面计算出的时间之后启动一个线程 self._thread = eventlet.spawn_after( initial_delay, self._run, first_run_time) self._thread.link(self._on_done) def _run(self, expect_run_time): while self._running: self._is_sleeping = False self._pre_run_time = expect_run_time #执行保护操作,并返回下次期望执行的时间 expect_run_time = self._function(expect_run_time) if expect_run_time is None or not self._running: break self._is_sleeping = True now = datetime.utcnow() #计算到下次执行所需要的时间 idle_time = 0 if expect_run_time <= now else int( timeutils.delta_seconds(now, expect_run_time)) eventlet.sleep(idle_time)上面的的代码逻辑中首先创建一个线程,在线程中循环执行保护任务。每一次执行完任务之后睡眠一段时间,而这个时间间隔是这次执行到下次执行时间之间的间隔。这就是karbor中定时时行保护计划的逻辑。数据保护具体流程这里不做分析。