前言
本文主要记录社区版本创建路由的大致过程,路由功能通过linux namespace的功能来实现。
前提
neutron-l3-agent功能分析
- neutron-l3-agent启动部分,l3-agent服务启动线程对路由队列去做router_update的操作:
PS:篇幅有限,只提取本文关注部分,neutron-l3-agent服务启动过程日后会单独分析;
neutron.agent.l3.agent
class L3NATAgentWithStateReport(L3NATAgent):
......
def after_start(self):
# l3-agent启动之后,生成线程,执行工作
eventlet.spawn_n(self._process_routers_loop)
class L3NATAgent():
......
def _process_routers_loop(self):
LOG.debug("Starting _process_routers_loop")
pool = eventlet.GreenPool(size=8)
while True:
# 循环创建新的线程池去执行router_update
pool.spawn_n(self._process_router_update)
- 优先级队列
这里引入一个新的概念,优先级队列;实现过程中创建路由或对路由有信息更改时,传入信息到队列,然后由上文中循环线程提取队列信息,消化队列;
同样由于篇幅有限,之后再剖出这部分内容,提取关键信息如下:
neutron.agent.l3.agent
class L3NATAgent():
def __init__(self):
......
# 初始化队列
self._queue = queue.ResourceProcessingQueue()
创建路由
l3_plugin
plugin部分需要判断创建路由时是否关联外部网络,如果关联才会调用l3_agent进行处理。
neutron.db.l3_db
class L3_NAT_db_mixin():
def create_router(self, context, router):
router_dict = super(L3_NAT_db_mixin, self).create_router(context, router)
# 根据POST信息生成路由信息,存入数据库
if router_dict.get('external_gateway_info'):
self.notify_router_updated(context, router_dict['id'], None)
# 如果路由创建时有外部网关信息,则调用l3_agent做处理
return router_dict
def notify_router_updated():
if router_id:
self.l3_rpc_notifier.routers_updated(
context, [router_id], operation)
neutron.api.rpc.agentnotifiers.l3_rpc_agent_api
class L3AgentNotifyApi():
def routers_updated(self, context, router_ids, operation=None, data=None, shuffle_agents=False, schedule_routers=True):
......
if router_ids:
self._notification(context, 'routers_updated', router_ids,
operation, shuffle_agents, schedule_routers)
def _notification(self, context, method, router_ids, operation, shuffle_agents, schedule_routers=True):
......
plugin = directory.get_plugin(plugin_constants.L3)
if schedule_routers:
# 创建路由和l3_agent的绑定,数据存入数据库
plugin.schedule_routers(adminContext, router_ids)
self._agent_notification(
context, method, router_ids, operation, shuffle_agents)
def _agent_notification(self, context, method, router_ids, operation, shuffle_agents):
plugin = directory.get_plugin(plugin_constants.L3) # l3_plugin
for router_id in router_ids:
# 获取路由绑定的l3_agent所在的主机
hosts = plugin.get_hosts_to_notify(adminContext, router_id)
for host in hosts:
# 发送cast信息,method='routers_updated'
cctxt = self.client.prepare(topic=topics.L3_AGENT,
server=host,
version='1.1')
cctxt.cast(context, method, routers=[router_id])
l3_agent
传过来的routers_updated信息附上优先级等信息,加入到队列self._queue中
neutorn.agent.l3.agent
class L3NATAgent():
def routers_updated(self, context, routers):
if routers:
......
for id in routers:
update = queue.ResourceUpdate(
id, PRIORITY_RPC, action=ADD_UPDATE_ROUTER)
self._queue.add(update)
加入到队列中后,等待前文进程的提取
neutron.agnet.l3.agent
class L3NATAgent():
def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_resource():
......
routers = [update.resource] if update.resource else [] # []
......
not_delete_no_routers = (update.action != DELETE_ROUTER and not routers) # True
if not_delete_no_routers or related_action:
try:
update.timestamp = timeutils.utcnow()
routers = self.plugin_rpc.get_routers(self.context, [update.id]) # 获取路由详细信息
......
if not self._process_routers_if_compatible(routers, update): # False
def _process_routers_if_compatible(self, routers, update):
process_result = True
for router in routers:
try:
self._process_router_if_compatible(router)
def _process_router_if_compatible(self, router):
if router['id'] not in self.router_info:
# 增加该路由
self._process_added_router(router)
/主要分析此方法/
def _process_added_router(self, router):
self._router_added(router['id'], router)
ri = self.router_info[router['id']]
ri.router = router
ri.process()
registry.notify(resources.ROUTER, events.AFTER_CREATE, self, router=ri)
self.l3_ext_manager.add_router(self.context, router)
- _router_added 路由创建
neutron.agent.l3.agent
class L3NATAgent():
def _router_added(self, router_id, router):
ri = self._create_router(router_id, router)
self.router_info[router_id] = ri
try:
ri.initialize(self.process_monitor)
def _create_router(self, router_id, router):
# 获取部分agent信息,返回路由对象
args = []
kwargs = {
'agent': self,
'router_id': router_id,
'router': router,
'use_ipv6': self.use_ipv6,
'agent_conf': self.conf,
'interface_driver': self.driver, # neutron.agent.linux.interface:OVSInterfaceDriver
}
return legacy_router.LegacyRouter(*args, **kwargs)
ri.initialize(self.process_monitor)
neutron.agent.l3.router_info
class RouterInfo():
def initialize(self, process_monitor):
self.process_monitor = process_monitor
self.radvd = ra.DaemonMonitor(self.router_id,
self.ns_name,
process_monitor,
self.get_internal_device_name,
self.agent_conf)
self.router_namespace.create()
self.router_namespace.create():创建路由对应命名空间
neutron.agent.l3.namespaces
class Namespace():
def create(self, ipv6_forwarding=True):
# 判断命名空间是否存在,不存在则创建新的
# 并在命名空间执行sysctl -w net.ipv4.conf.all.promote_secondaries=1
# 命名空间内执行ip link set lo up
# ...执行sysctl -w net.ipv4.ip_forward=1
# ...执行sysctl -w net.ipv4.conf.all.arp_ignore=1
# ...执行sysctl -w net.ipv4.conf.all.arp_announce=2
ip_wrapper = self.ip_wrapper_root.ensure_namespace(self.name)
cmd = ['sysctl', '-w', 'net.ipv4.ip_forward=1']
ip_wrapper.netns.execute(cmd)
cmd = ['sysctl', '-w', 'net.ipv4.conf.all.arp_ignore=1']
ip_wrapper.netns.execute(cmd)
cmd = ['sysctl', '-w', 'net.ipv4.conf.all.arp_announce=2']
ip_wrapper.netns.execute(cmd)
- ri.process()
neutron.agent.l3.router_info
class RouterInfo():
def process(self):
# 创建interface qg-xxxxx
# ovs-vsctl add-port br-int qg-xxxxx
# ovs-vsctl set Port qg-xxxxx tag=4095
# ovs-vsctl set Interface qg-xxxxx type=internal external_ids:iface-id=e0ae3934-2ba0-4723-bd75-f35bef5a128a external_ids:iface-status=active external_ids:attached-mac=fa:16:3e:ff:ff:3f
# 命名空间内ip link set address fa:16:3e:ff:ff:3f dev qg-xxxx
# ......ip link set qg-xxxxx mtu 1500
# ......ip link set qg-xxxxx up
# ......加网关ip给qg-xxxxx
# 增加iptables规则允许snat
self.process_external()
完成大致过程
总结
使用社区l3时,创建路由时,且有网关的情况下,是使用linux的namespace的功能来进行转发;
创建namespace后,在ovs的网桥上关联qg-xxxxx设备,给与tag,mac等信息,在namespace中给qg-xxxxx设备指定ip;之后在为命名空间更新Iptables;
所有的操作都是调用linux系统命令来进行
后续
记录需要之后更新的内容
- 队列系统,需要以后进行剖析:队列系统根据存入时间,存入资源的ID等进行提取,并且在创建失败后,重新将任务存入队列实现失败后重复创建的功能。
- OpenStack中调用Linux系统执行任务时,所做的工作和代码解析。
- neutron-l3-agent的启动过程。