openstack-ceilometer第三式:源码分析-notification
一、启动命令
exec ceilometer-agent-notification \
--config-file /etc/ceilometer/ceilometer.conf
二、代码入口
ceilometer代码使用setuptools的pbr管理,该部分知识请见:
Openstack中setuptools和pbr软件打包管理
入口在:/ceilometer/cmd/agent_notification.py的main函数
def main():
service.prepare_service()
sm = cotyledon.ServiceManager()
sm.add(notification.NotificationService,
workers=CONF.notification.workers)
sm.run()
该部分使用cotyledon的多进程框架实现
直接看NotificationService类的 run 方法
三、服务启动
服务启动在:NotificationService.run
def run(self):
super(NotificationService, self).run()
self.shutdown = False
self.periodic = None
self.partition_coordinator = None
self.coord_lock = threading.Lock()
self.listeners = []
# NOTE(kbespalov): for the pipeline queues used a single amqp host
# hence only one listener is required
self.pipeline_listener = None
"""
(Pdb) self.pipeline_manager.__dict__
{
'cfg_mtime': 1592551325.2732353,
'cfg_hash': 'ba1311798d634022e4b684b72bf7b42a',
'cfg_loc': '/etc/ceilometer/pipeline.yaml',
'pipelines': [ < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3ad0 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3290 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3b90 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e39d0 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3950 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3a10 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e37d0 > ]
}
(Pdb) self.pipeline_manager.pipelines[0].__dict__
{
'source': < ceilometer.pipeline.SampleSource object at 0x7fa17405e050 > ,
'sink': < ceilometer.pipeline.SampleSink object at 0x7fa17405e350 > ,
'name': 'notification_source:notification_sink'
}
(Pdb) self.pipeline_manager.pipelines[0].source.__dict__
{
'name': 'notification_source',
'cfg': {
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'name': 'notification_source',
'sinks': ['notification_sink']
},
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'discovery': [],
'resources': [],
'sinks': ['notification_sink']
}
(Pdb) self.pipeline_manager.pipelines[0].sink.__dict__
{
'publishers': [ < ceilometer.publisher.messaging.SampleNotifierPublisher object at 0x7fa17405e410 > ],
'transformers': [],
'name': 'notification_sink',
'cfg': {
'publishers': ['notifier://'],
'transformers': None,
'name': 'notification_sink'
},
'multi_publish': False,
'transformer_cfg': []
}
"""
self.pipeline_manager = pipeline.setup_pipeline()
self.event_pipeline_manager = pipeline.setup_event_pipeline()
self.transport = messaging.get_transport()
if cfg.CONF.notification.workload_partitioning:
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
else:
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
# not a ceilometer. Until we have something to get the
# notification_topics in another way, we must create a transport
# to ensure the option has been registered by oslo_messaging.
messaging.get_notifier(self.transport, '')
self.group_id = None
# 该函数里判断了是否支持工作负载,如果是则返回SamplePipelineTransportManager类实例替换掉
# PipelineManager类实例,否则还是PipelineManager类实例
# 这两者的区别在于publisher函数实现是不一样的
# SamplePipelineTransportManager的在调用publisher函数时会再发到消息队列中去保存,之后会再取出来处理再发到gnocchi-api上
# PipelineManager的则直接去发到gnocchi-api服务上去了
self.pipe_manager = self._get_pipe_manager(self.transport,
self.pipeline_manager)
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)
self._configure_main_queue_listeners(self.pipe_manager,
self.event_pipe_manager)
if cfg.CONF.notification.workload_partitioning:
......
if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
'advisable to disable these meters using '
'ceilometer.conf or the pipeline.yaml'))
self.init_pipeline_refresh()
以上的代码中的一些参数我都用pdb打印出来了,可以更加直观的感受,我们来逐行分析一下服务启动的过程:
-
前 12 主要是一些参数的定义和初始化的工作
-
看下第 61 行和 62 行两部分,实现的逻辑是一样的,我们来分析一下 61 行:
def setup_pipeline(transformer_manager=None): """Setup pipeline manager according to yaml config file.""" default = extension.ExtensionManager('ceilometer.transformer') cfg_file = cfg.CONF.pipeline_cfg_file return PipelineManager(cfg_file, transformer_manager or default, SAMPLE_TYPE)
-
第4行加载了 ceilometer.transformer 命名空间中的插件
ceilometer.transformer = accumulator = ceilometer.transformer.accumulator:TransformerAccumulator delta = ceilometer.transformer.conversions:DeltaTransformer unit_conversion = ceilometer.transformer.conversions:ScalingTransformer rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer aggregator = ceilometer.transformer.conversions:AggregatorTransformer arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
-
第 5 行的文件是 pipeline.yaml ,然后使用 PipelineManager 类进行初始化
我们来看下 PipelineManager 类初始化的内容,同样的我也加了 pdb 调试出来,更加直观:
class PipelineManager(ConfigManagerBase): def __init__(self, cfg_info, transformer_manager, p_type=SAMPLE_TYPE): super(PipelineManager, self).__init__() cfg = self.load_config(cfg_info) self.pipelines = [] if not ('sources' in cfg and 'sinks' in cfg): raise PipelineException("Both sources & sinks are required", cfg) LOG.info(_LI('detected decoupled pipeline config format')) unique_names = set() sources = [] """ pipeline.yaml (Pdb) cfg.get('sources') [{ 'interval': 3600, 'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'], 'name': 'notification_source', 'sinks': ['notification_sink'] }, { 'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'name': 'meter_source', 'sinks': ['meter_sink'] }] event_pipeline.yaml (Pdb) cfg.get('sources') [{ 'sinks': ['event_sink'], 'name': 'event_source', 'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists'] }] """ for s in cfg.get('sources'): name = s.get('name') if name in unique_names: raise PipelineException("Duplicated source names: %s" % name, self) else: unique_names.add(name) """ pipeline.yaml (Pdb) sources [<ceilometer.pipeline.SampleSource object at 0x7fa17405e050>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e110>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e090>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e190>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e250>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e210>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e310>] (Pdb) sources[0].__dict__ { 'name': 'notification_source', 'cfg': { 'interval': 3600, 'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'], 'name': 'notification_source', 'sinks': ['notification_sink'] }, 'interval': 3600, 'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'], 'discovery': [], 'resources': [], 'sinks': ['notification_sink'] } (Pdb) sources[1].__dict__ {'name': 'meter_source', 'cfg': {'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'name': 'meter_source', 'sinks': ['meter_sink']}, 'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'discovery': [], 'resources': [], 'sinks': ['meter_sink']} event_pipeline.yaml { 'cfg': { 'sinks': ['event_sink'], 'name': 'event_source', 'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists'] }, 'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists'], 'name': 'event_source', 'sinks': ['event_sink'] } """ sources.append(p_type['source'](s)) unique_names.clear() sinks = {} """ pipeline.yaml (Pdb) cfg.get('sinks') [{ 'publishers': ['notifier://'], 'transformers': None, 'name': 'notification_sink' }, { 'publishers': ['notifier://'], 'transformers': None, 'name': 'meter_sink' }] event_pipeline.yaml (Pdb) cfg.get('sinks') [{'publishers': ['notifier://'], 'transformers': None, 'name': 'event_sink'}] """ for s in cfg.get('sinks'): name = s.get('name') if name in unique_names: raise PipelineException("Duplicated sink names: %s" % name, self) else: unique_names.add(name) """ (Pdb) sinks {'network_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741e3350>, 'volume_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741dbfd0>, 'disk_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741dbc50>, 'cpu_delta_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e290>, 'cpu_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e390>, 'meter_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e3d0>, 'notification_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e350>} (Pdb) sinks.get('network_sink').__dict__ { 'publishers': [ < ceilometer.publisher.messaging.SampleNotifierPublisher object at 0x7fa1741e3890 > ], 'transformers': [ < ceilometer.transformer.conversions.RateOfChangeTransformer object at 0x7fa1741e3710 > ], 'name': 'network_sink', 'cfg': { 'publishers': ['notifier://'], 'transformers': [{ 'name': 'rate_of_change', 'parameters': { 'source': { 'map_from': { 'name': 'network\\.(incoming|outgoing)\\.(bytes|packets)', 'unit': '(B|packet)' } }, 'target': { 'map_to': { 'name': 'network.\\1.\\2.rate', 'unit': '\\1/s' }, 'type': 'gauge' } } }], 'name': 'network_sink' }, 'multi_publish': False, 'transformer_cfg': [{ 'name': 'rate_of_change', 'parameters': { 'source': { 'map_from': { 'name': 'network\\.(incoming|outgoing)\\.(bytes|packets)', 'unit': '(B|packet)' } }, 'target': { 'map_to': { 'name': 'network.\\1.\\2.rate', 'unit': '\\1/s' }, 'type': 'gauge' } } }] } """ sinks[s['name']] = p_type['sink'](s, transformer_manager) unique_names.clear() for source in sources: source.check_sinks(sinks) for target in source.sinks: pipe = p_type['pipeline'](source, sinks[target]) if pipe.name in unique_names: raise PipelineException( "Duplicate pipeline name: %s. Ensure pipeline" " names are unique. (name is the source and sink" " names combined)" % pipe.name, cfg) else: unique_names.add(pipe.name) self.pipelines.append(pipe) unique_names.clear() def publisher(self): """Build a new Publisher for these manager pipelines. :param context: The context. """ return PublishContext(self.pipelines)
-
第 37-79 行是取出 pipeline.yaml 文件中的 sources然后经过p_type[‘source’]类的初始化,再放进 sources 对象里,那么p_type[‘source’]类是什么呢,如下所示,代表SampleSource类,这个类里面就是解析下配置文件中每个资源的采集周期interval和采集插件 meters
SAMPLE_TYPE = {'pipeline': SamplePipeline, 'source': SampleSource, 'sink': SampleSink} EVENT_TYPE = {'pipeline': EventPipeline, 'source': EventSource, 'sink': EventSink}
-
同样的,第 98-158 行是为了取出配置文件中的 sinks 对象,使用SampleSink类进行初始化,这个类本身没有 init 函数,所以使用此类的基类进行初始化,即 Sink 类
class Sink(object): def __init__(self, cfg, transformer_manager): self.cfg = cfg try: self.name = cfg['name'] # It's legal to have no transformer specified self.transformer_cfg = cfg.get('transformers') or [] except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) if not cfg.get('publishers'): raise PipelineException("No publisher specified", cfg) self.publishers = [] for p in cfg['publishers']: if '://' not in p: # Support old format without URL p = p + "://" try: self.publishers.append(publisher.get_publisher(p, self.NAMESPACE)) except Exception: LOG.exception(_("Unable to load publisher %s"), p) self.multi_publish = True if len(self.publishers) > 1 else False self.transformers = self._setup_transformers(cfg, transformer_manager)
这里面主要看下第 22 行和第 28 行,分析:
- 第 22 行调用了get_publisher方法,这个里面主要是加载ceilometer.publisher命名空间中的notifier插件
- 第 28 行调用了_setup_transformers 方法,其中transformer_manager参数是ceilometer-transformer 中的所有插件,方法里面主要实现的是,遍历配置文件中的 sinks ,然后根据transformer 中的 name 来生成插件对象,比如name 是rate_of_change的话,即代表生成在ceilometer-transformer中的rate_of_change插件,然后放到transformers列表里
-
我们接着来分析PipelineManager,然后第 160-185 行,就是遍历在以上分析配置文件之后存储的值 sources 和 sinks,拿出 sources 中每个 source,然后再拿出 source 中的每个 sink,使用SamplePipeline类初始化之后,放进self.pipelines列表,该类没有 init 方法,所以调用基类Pipeline
@six.add_metaclass(abc.ABCMeta) class Pipeline(object): """Represents a coupling between a sink and a corresponding source.""" def __init__(self, source, sink): self.source = source self.sink = sink self.name = str(self)
-
最终在对象存储的数据是:
pipeline.yaml (Pdb) self.pipelines [<ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3ad0>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3290>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3b90>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e39d0>] (Pdb) self.pipelines[0].__dict__ {'source': <ceilometer.pipeline.SampleSource object at 0x7fc20c082650>, 'sink': <ceilometer.pipeline.SampleSink object at 0x7fc20c082050>, 'name': 'notification_source:notification_sink'} event_pipeline.yaml (Pdb) self.pipelines [<ceilometer.pipeline.EventPipeline object at 0x7fa1741e3d10>] (Pdb) self.pipelines[0].__dict__ {'source': <ceilometer.pipeline.EventSource object at 0x7fa1741e3dd0>, 'sink': <ceilometer.pipeline.EventSink object at 0x7fa1741e3ed0>, 'name': 'event:event_source:event_sink'} """
-
-
再继续 NotificationService.run 方法,61 行已经分析完毕,62 行和 61 行逻辑一致
-
第 79-82 行的分析见代码中
-
最终的是第 89 行的代码,调用了 _configure_main_queue_listeners 方法:
def _configure_main_queue_listeners(self, pipe_manager, event_pipe_manager): """""" """ (Pdb) notification_manager.extensions[0].__dict__ { 'obj': < ceilometer.network.notifications.Firewall object at 0x7fa1741e3d50 > , 'entry_point': EntryPoint.parse('network.services.firewall = ceilometer.network.notifications:Firewall'), 'name': 'network.services.firewall', 'plugin': < class 'ceilometer.network.notifications.Firewall' > } """ notification_manager = self._get_notifications_manager(pipe_manager) if not list(notification_manager): LOG.warning(_('Failed to load any notification handlers for %s'), self.NOTIFICATION_NAMESPACE) ack_on_error = cfg.CONF.notification.ack_on_event_error endpoints = [] """ (Pdb) endpoints [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>] (Pdb) endpoints[0].__dict__ { 'event_converter': < ceilometer.event.converter.NotificationEventsConverter object at 0x7fa17467c050 > , 'manager': < ceilometer.pipeline.PipelineManager object at 0x2f6d550 > } (Pdb) endpoints[0].manager.__dict__ { 'cfg_mtime': 1592551325.2732353, 'cfg_hash': 'c01e8ee1c40b66314628536afcd48a39', 'cfg_loc': '/etc/ceilometer/event_pipeline.yaml', 'pipelines': [ < ceilometer.pipeline.EventPipeline object at 0x7fa1741e3d10 > ] } """ endpoints.append( event_endpoint.EventsNotificationEndpoint(event_pipe_manager)) targets = [] for ext in notification_manager: handler = ext.obj """ (Pdb) cfg.CONF.oslo_messaging_notifications.topics ['notifications'] (Pdb) cfg.CONF.notification.disable_non_metric_meters True """ if (cfg.CONF.notification.disable_non_metric_meters and isinstance(handler, base.NonMetricNotificationBase)): continue LOG.debug('Event types from %(name)s: %(type)s' ' (ack_on_error=%(error)s)', {'name': ext.name, 'type': ', '.join(handler.event_types), 'error': ack_on_error}) # NOTE(gordc): this could be a set check but oslo_messaging issue # https://bugs.launchpad.net/oslo.messaging/+bug/1398511 # This ensures we don't create multiple duplicate consumers. for new_tar in handler.get_targets(cfg.CONF): if new_tar not in targets: targets.append(new_tar) endpoints.append(handler) """ (Pdb) endpoints [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>, <ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fa1741f6810>, <ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fa17424d410>, <ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fa17424d8d0>, <ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fa1742461d0>, <ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fa17424dc90>, <ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fa17467ddd0>] (Pdb) cfg.CONF.notification.messaging_urls ['rabbit://rabbitmq:[email protected]:5672/'] """ urls = cfg.CONF.notification.messaging_urls or [None] for url in urls: transport = messaging.get_transport(url) # NOTE(gordc): ignore batching as we want pull # to maintain sequencing as much as possible. listener = messaging.get_batch_notification_listener( transport, targets, endpoints) listener.start() self.listeners.append(listener)
-
第 13 行,里面主要是加载了ceilometer.notification命名空间的插件,并将pipe_manager作为自动加载extension时传入的参数
-
第 37 行,定义了当其余组件触发事件的时候的 endpoints,此处我们稍后分析
-
第 40-64 行,遍历刚在 13 行加载的插件,里面大概的逻辑是,先判断该插件的对象是不是 NonMetricNotificationBase 类的子类,如果是的话,直接跳过,如果不是的话,调用插件的 get-target 方法,生成oslo_messaging.Target实例,topic 是 notifications,然后将插件也放进 endpoints 中
-
第 78 行到最后就是开启服务的最后了,首先我们看下这个 urls 是什么:
(Pdb) cfg.CONF.notification.messaging_urls ['rabbit://rabbitmq:[email protected]:5672/']
使用 rabbitmq 通信,然后在 83 行调用了get_batch_notification_listener方法,方法中定义接收消息的 target 类型,已经消息过来之后的 endpoints ,这个 endpoints 我们知道里面有两中类型的数据,一种是采集的数据的收集,还有一种是 event 事件的收集,我们看下这个里面发生了什么
def get_batch_notification_listener(transport, targets, endpoints, allow_requeue=False, batch_size=1, batch_timeout=None): """ (Pdb) a transport = <oslo_messaging.transport.Transport object at 0x7fab5c6a9190> targets = [<Target exchange=ironic, topic=notifications>, <Target exchange=ceilometer, topic=notifications>, <Target exchange=nova, topic=notifications>, <Target exchange=cinder, topic=notifications>, <Target exchange=glance, topic=notifications>, <Target exchange=neutron, topic=notifications>, <Target exchange=heat, topic=notifications>, <Target exchange=keystone, topic=notifications>, <Target exchange=sahara, topic=notifications>, <Target exchange=trove, topic=notifications>, <Target exchange=zaqar, topic=notifications>, <Target exchange=swift, topic=notifications>, <Target exchange=magnum, topic=notifications>, <Target exchange=central, topic=notifications>] endpoints = [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x7fab5c258c90>, <ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fab5c2a9610>, <ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fab5c2b8110>, <ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fab5c2b8510>, <ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fab5c2b8810>, <ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fab5c2b8a90>, <ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fab5c6be210>] allow_requeue = False batch_size = 1 batch_timeout = None """ return oslo_messaging.get_batch_notification_listener( transport, targets, endpoints, executor='threading', allow_requeue=allow_requeue, batch_size=batch_size, batch_timeout=batch_timeout)
实际上调用还是oslo_messaging.get_batch_notification_listener 我们继续往里分析
def get_batch_notification_listener(transport, targets, endpoints, executor='blocking', serializer=None, allow_requeue=False, pool=None, batch_size=None, batch_timeout=None): dispatcher = notify_dispatcher.BatchNotificationDispatcher( endpoints, serializer) return BatchNotificationServer( transport, targets, dispatcher, executor, allow_requeue, pool, batch_size, batch_timeout )
看下BatchNotificationDispatcher的实现,没有 init 方法,调用基类NotificationDispatcher的 init 方法,即:
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample'] class NotificationDispatcher(dispatcher.DispatcherBase): def __init__(self, endpoints, serializer): self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() self._callbacks_by_priority = {} for endpoint, prio in itertools.product(endpoints, PRIORITIES): if hasattr(endpoint, prio): method = getattr(endpoint, prio) screen = getattr(endpoint, 'filter_rule', None) self._callbacks_by_priority.setdefault(prio, []).append( (screen, method))
这里用到了itertools.product(A,B),这个方法返回A、B中的元素的笛卡尔积的元组,以此遍历,如果某个 endpoint 中实现了此 prio 的属性,如果存在的话,即存进_callbacks_by_priority,大概格式是:
'sample': [ ( < oslo_messaging.notify.filter.NotificationFilter object at 0x7ff14c254590 > , < bound method TemperatureSensorNotification.sample of < ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7ff14c254550 >> ), ( < oslo_messaging.notify.filter.NotificationFilter object at 0x7ff14c260590 > , < bound method TelemetryIpc.sample of < ceilometer.telemetry.notifications.TelemetryIpc object at 0x7ff14c260150 >> )]
列表中元祖格式(screen,method),其中 screen 是插件类的filter_rule属性,以TelemetryIpc为例,NotificationBase是其基类:
@six.add_metaclass(abc.ABCMeta) class NotificationBase(PluginBase): """Base class for plugins that support the notification API.""" def __init__(self, manager): super(NotificationBase, self).__init__() # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch # messages to an endpoint. if self.event_types: self.filter_rule = oslo_messaging.NotificationFilter( event_type='|'.join(self.event_types)) self.manager = manager
NotificationFilter是用来筛选notification 服务接收到的消息的,可以根据 context,publisher_id, event_type, metadata and payload来筛选
我们再回到get_batch_notification_listener,dispatcher分析完了之后,下面就是BatchNotificationServer类的实例
继承关系 BatchNotificationServer--->NotificationServerBase--->MessageHandlingServer
这里面的初始化就是简单的定义一些参数,返回 listen 对象
然后_configure_main_queue_listeners方法的第 83 行调用了 listen.start,这里就是调用MessageHandlingServer的 start 方法
@ordered(reset_after='stop') def start(self, override_pool_size=None): if self._started: LOG.warning('The server has already been started. Ignoring ' 'the redundant call to start().') return self._started = True executor_opts = {} if self.executor_type in ("threading", "eventlet"): executor_opts["max_workers"] = ( override_pool_size or self.conf.executor_thread_pool_size ) self._work_executor = self._executor_cls(**executor_opts) try: self.listener = self._create_listener() except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) self.listener.start(self._on_incoming)
这里主要看下 19 行和 23 行,第 19 行调用了_create_listener,实际上调用的是NotificationServerBase的方法
def _create_listener(self): return self.transport._listen_for_notifications( self._targets_priorities, self._pool, self._batch_size, self._batch_timeout )
我们看到这里又调用了_listen_for_notifications
def _listen_for_notifications(self, targets_and_priorities, pool, batch_size, batch_timeout): for target, priority in targets_and_priorities: if not target.topic: raise exceptions.InvalidTarget('A target must have ' 'topic specified', target) return self._driver.listen_for_notifications( targets_and_priorities, pool, batch_size, batch_timeout )
继续往里分析
def listen_for_notifications(self, targets_and_priorities, pool, batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) listener = NotificationAMQPListener(self, conn) for target, priority in targets_and_priorities: conn.declare_topic_consumer( exchange_name=self._get_exchange(target), topic='%s.%s' % (target.topic, priority), callback=listener, queue_name=pool) return base.PollStyleListenerAdapter(listener, batch_size, batch_timeout)
这段代码主要是为了在要监听的队列上创建消费者进行监听,并初始化PollStyleListenerAdapter类实例并返回
PollStyleListenerAdapter类对象初始化时会生成一个线程对象:
File:oslo_messaging/_drivers/base.py:PollStyleListenerAdapter.init
self._listen_thread = threading.Thread(target=self.____runner)
然后调用start后就会生成一个线程运行___runner函数,该函数主要功能是不断的去获取消息,并通过调用_process_incoming函数来处理消息
class PollStyleListenerAdapter(Listener): """A Listener that uses a PollStyleListener for message transfer. A dedicated thread is created to do message polling. """ def __init__(self, poll_style_listener, batch_size, batch_timeout): super(PollStyleListenerAdapter, self).__init__( batch_size, batch_timeout, poll_style_listener.prefetch_size ) self._poll_style_listener = poll_style_listener self._listen_thread = threading.Thread(target=self._runner) self._listen_thread.daemon = True self._started = False def start(self, on_incoming_callback): super(PollStyleListenerAdapter, self).start(on_incoming_callback) self._started = True self._listen_thread.start() @excutils.forever_retry_uncaught_exceptions def _runner(self): while self._started: incoming = self._poll_style_listener.poll( batch_size=self.batch_size, batch_timeout=self.batch_timeout) if incoming: self.on_incoming_callback(incoming) # listener is stopped but we need to process all already consumed # messages while True: incoming = self._poll_style_listener.poll( batch_size=self.batch_size, batch_timeout=self.batch_timeout) if not incoming: return self.on_incoming_callback(incoming)
我们看到在MessageHandlingServer的 start 方法中的最后一行是 self.listener.start(self._on_incoming) 这里调用的就是上述代码中的 start,启动之后调用 runner,然后通过poll 不断从队列中取出数据,取出之后用self.on_incoming_callback(incoming)处理,即MessageHandlingServer中的self.__on_incoming处理
def _on_incoming(self, incoming): """Handles on_incoming event :param incoming: incoming request. """ self._work_executor.submit(self._process_incoming, incoming)
我们看到这边实际使用的是self._process_incoming来处理,即采用BatchNotificationServer类中的self.__process_incoming处理
class BatchNotificationServer(NotificationServerBase): def _process_incoming(self, incoming): try: not_processed_messages = self.dispatcher.dispatch(incoming) except Exception: ......
可以看到该处理函数会调用dispatcher对象来分派消息,这里的self.dispatcher,就是之前使用NotificationDispatcher初始化后传过来的参数,即调用
class BatchNotificationDispatcher(NotificationDispatcher): """A message dispatcher which understands Notification messages. A MessageHandlingServer is constructed by passing a callable dispatcher which is invoked with a list of message dictionaries each time 'batch_size' messages are received or 'batch_timeout' seconds is reached. """ def dispatch(self, incoming): """Dispatch notification messages to the appropriate endpoint method. """ messages_grouped = itertools.groupby(sorted( (self._extract_user_message(m) for m in incoming), key=operator.itemgetter(0)), operator.itemgetter(0)) requeues = set() for priority, messages in messages_grouped: __, raw_messages, messages = six.moves.zip(*messages) if priority not in PRIORITIES: LOG.warning('Unknown priority "%s"', priority) continue for screen, callback in self._callbacks_by_priority.get(priority, []): if screen: filtered_messages = [message for message in messages if screen.match( message["ctxt"], message["publisher_id"], message["event_type"], message["metadata"], message["payload"])] else: filtered_messages = list(messages) if not filtered_messages: continue ret = self._exec_callback(callback, filtered_messages) if ret == NotificationResult.REQUEUE: requeues.update(raw_messages) break return requeues def _exec_callback(self, callback, messages): try: return callback(messages) except Exception: LOG.exception("Callback raised an exception.") return NotificationResult.REQUEUE
这里重点看通过消息的priority字段查找self._callbacks_by_priority字典里匹配的插件的对应方法,也即是获取到callback函数,然后进行调用。比如notifications.sample上的消息会匹配到TelemetryIpc类的sample方法去处理
_sample = ceilometer.telemetry.notifications:TelemetryIpc
但该类实例是调用了它父类的sample方法:ceilometer/agent/plugin_base.py:NotificationBase.sample
def sample(self, notifications): self._process_notifications('sample', notifications) def _process_notifications(self, priority, notifications): for notification in notifications: try: notification = messaging.convert_to_old_notification_format( priority, notification) self.to_samples_and_publish(notification) except Exception: LOG.error(_LE('Fail to process notification'), exc_info=True) def to_samples_and_publish(self, notification): with self.manager.publisher() as p: p(list(self.process_notification(notification)))
可以看到这边以次往下调用_process_notifications–>to_samples_and_publish,这边比较重要的是第 14 行和第 15 行,第14 行调用了self.manager.publisher(),此处的 manager 是在插件初始化的时候传进来的,是PipelineManager类的实例,所以看下PipelineManager.publisher
def publisher(self): return PublishContext(self.pipelines)
参数self.pipelines是在init的时候生成的,看下PublishContext
class PublishContext(object): def __init__(self, pipelines=None): pipelines = pipelines or [] self.pipelines = set(pipelines) def add_pipelines(self, pipelines): self.pipelines.update(pipelines) def __enter__(self): def p(data): for p in self.pipelines: p.publish_data(data) return p def __exit__(self, exc_type, exc_value, traceback): for p in self.pipelines: p.flush()
因为这边使用 with 实现,所以看下 enter 方法,enter 返回的值与 as 后面的值绑定,data 参数就是list(self.process_notification(notification)),这里的 p 有两种类型,SamplePipeline和EventPipeline
以SamplePipeline为例
def publish_data(self, samples): if not isinstance(samples, list): samples = [samples] supported = [s for s in samples if self.source.support_meter(s.name) and self._validate_volume(s)] self.sink.publish_samples(supported)
调用SampleSink的publish_samples方法
def _publish_samples(self, start, samples): """Push samples into pipeline for publishing. :param start: The first transformer that the sample will be injected. This is mainly for flush() invocation that transformer may emit samples. :param samples: Sample list. """ transformed_samples = [] if not self.transformers: transformed_samples = samples else: for sample in samples: LOG.debug( "Pipeline %(pipeline)s: Transform sample " "%(smp)s from %(trans)s transformer", {'pipeline': self, 'smp': sample, 'trans': start}) sample = self._transform_sample(start, sample) if sample: transformed_samples.append(sample) if transformed_samples: for p in self.publishers: try: p.publish_samples(transformed_samples) except Exception: LOG.exception(_( "Pipeline %(pipeline)s: Continue after error " "from publisher %(pub)s") % ({'pipeline': self, 'pub': p})) def publish_samples(self, samples): self._publish_samples(0, samples)
可以看到self._publish_samples方法被调用,在该方法中组装了transformed_samples之后,遍历self.publishers的publish_samples方法来处理数据,之前我们看到 publisher 里面放的是 notifier 插件,那么即调用SampleNotifierPublisher类,此类继承NotifierPublisher–>MessagingPublisher,即调用到MessagingPublisher的publish_samples方法
def publish_samples(self, samples): """Publish samples on RPC. :param samples: Samples from pipeline after transformation. """ meters = [ utils.meter_message_from_counter( sample, cfg.CONF.publisher.telemetry_secret) for sample in samples ] topic = cfg.CONF.publisher_notifier.metering_topic self.local_queue.append((topic, meters)) if self.per_meter_topic: for meter_name, meter_list in itertools.groupby( sorted(meters, key=operator.itemgetter('counter_name')), operator.itemgetter('counter_name')): meter_list = list(meter_list) topic_name = topic + '.' + meter_name LOG.debug('Publishing %(m)d samples on %(n)s', {'m': len(meter_list), 'n': topic_name}) self.local_queue.append((topic_name, meter_list)) self.flush()
第 13 行的 topic 是 metering,最后执行了 flush,然后执行了_process_queue,其中有个send方法,将数据发出去
def _send(self, event_type, data): try: self.notifier.sample({}, event_type=event_type, payload=data) except oslo_messaging.MessageDeliveryFailure as e: raise_delivery_failure(e)
发到了 metering.sample 队列,之后的消息发送流程便和polling-compute 服务一样了,具体请见:polling-compute
-
-
参考:
https://www.cnblogs.com/luohaixian/p/11145939.html