源码路径: github.com/projectcalico/felix/routetable/route_table.go
RouteTable 结构体定义如下:
type RouteTable struct { logCxt *log.Entry ipVersion uint8 netlinkFamily int netlinkTimeout time.Duration // numConsistentNetlinkFailures counts the number of repeated netlink connection failures. // reset on successful connection. numConsistentNetlinkFailures int // Current netlink handle, or nil if we need to reconnect. cachedNetlinkHandle HandleIface dirtyIfaces set.Set ifacePrefixes set.Set ifacePrefixRegexp *regexp.Regexp ifaceNameToTargets map[string][]Target ifaceNameToL2Targets map[string][]L2Target ifaceNameToFirstSeen map[string]time.Time pendingIfaceNameToTargets map[string][]Target pendingIfaceNameToL2Targets map[string][]L2Target pendingConntrackCleanups map[ip.Addr]chan struct{} inSync bool // Whether this route table is managing vxlan routes. vxlan bool // Testing shims, swapped with mock versions for UT newNetlinkHandle func() (HandleIface, error) addStaticARPEntry func(cidr ip.CIDR, destMAC net.HardwareAddr, ifaceName string) error conntrack conntrackIface time timeIface }
NewIntDataplaneDriver
--> routetable.New
1. 实例化 RouteTable
- interfacePrefixes 接口前缀为 cali
- ipVersion 为 4
- NetlinkTimeout 为10000000000, 大约 10s
func New(interfacePrefixes []string, ipVersion uint8, vxlan bool, netlinkTimeout time.Duration) *RouteTable {
return NewWithShims(
interfacePrefixes,
ipVersion,
newNetlinkHandle,
vxlan,
netlinkTimeout,
addStaticARPEntry,
conntrack.New(),
realTime{},
)
}
loopUpdatingDataplane
--> processIfaceUpdate
--> onIfaceStateChanged
2. onIfaceStateChanged 函数
如果 iface 接口没有按照规则 ^cali.* 匹配,则无需处理,这个函数就是处理 calico 接口
如果 iface 为 up 状态的,则加入到 dirtyInfaces 等待处理这个接口
func (r *RouteTable) OnIfaceStateChanged(ifaceName string, state ifacemonitor.State) {
logCxt := r.logCxt.WithField("ifaceName", ifaceName)
if !r.ifacePrefixRegexp.MatchString(ifaceName) {
logCxt.Debug("Ignoring interface state change, not a Calico interface.")
return
}
if state == ifacemonitor.StateUp {
logCxt.Debug("Interface up, marking for route sync")
r.dirtyIfaces.Add(ifaceName)
r.onIfaceSeen(ifaceName)
}
}
loopUpdatingDataplane
--> apply
--> RouteTable Apply
3. RouteTable Apply 函数
func (r *RouteTable) Apply() error {
if !r.inSync {
listStartTime := time.Now()
nl, err := r.getNetlinkHandle()
if err != nil {
r.logCxt.WithError(err).Error("Failed to connect to netlink, retrying...")
return ConnectFailed
}
links, err := nl.LinkList()
if err != nil {
r.logCxt.WithError(err).Error("Failed to list interfaces, retrying...")
r.closeNetlinkHandle() // Defensive: force a netlink reconnection next time.
return ListFailed
}
根据 inSync 已经设置为 false,getNetlinkHandle 根据初始化时为 route,
func (r *RouteTable) QueueResync() {
r.logCxt.Info("Queueing a resync of routing table.")
r.inSync = false
}
3.1 将 calico 接口名加入到 dirtyIfaces 中
会对 dirtyIfaces 迭代更新操作,后文接续分析
// Clear the dirty set; there's no point trying to update non-existent interfaces.
r.dirtyIfaces = set.New()
for _, link := range links {
attrs := link.Attrs()
if attrs == nil {
continue
}
ifaceName := attrs.Name
if r.ifacePrefixRegexp.MatchString(ifaceName) {
r.logCxt.WithField("ifaceName", ifaceName).Debug(
"Resync: found calico-owned interface")
r.dirtyIfaces.Add(ifaceName)
r.onIfaceSeen(ifaceName)
}
}
迭代所有 dirtyIfaces 进行处理,一步步分析,最大重试次数为 2
3.2 如果设置为 vxlan 模式
if r.vxlan {
// Sync L2 routes first.
err := r.syncL2RoutesForLink(ifaceName)
if err == IfaceNotPresent {
logCxt.Info("Interface missing, will retry if it appears.")
break
} else if err == IfaceDown {
logCxt.Info("Interface down, will retry if it goes up.")
break
} else if err != nil {
logCxt.WithError(err).Warn("Failed to syncronise routes.")
retries--
continue
}
logCxt.Debug("Synchronised L2 routes on interface")
}
3.2.1 syncL2RoutesForLink
上一次加入 pendingIfaceNameToL2Targets,本轮将从 pendingIfaceNameToL2Targets 加入到 ifaceNameToL2Targets,即本轮期望加入的
func (r *RouteTable) syncL2RoutesForLink(ifaceName string) error {
logCxt := r.logCxt.WithField("ifaceName", ifaceName)
logCxt.Debug("Syncing interface L2 routes")
if updatedTargets, ok := r.pendingIfaceNameToL2Targets[ifaceName]; ok {
logCxt.Debug("Have updated targets.")
if updatedTargets == nil {
delete(r.ifaceNameToL2Targets, ifaceName)
} else {
r.ifaceNameToL2Targets[ifaceName] = updatedTargets
}
delete(r.pendingIfaceNameToL2Targets, ifaceName)
}
expectedTargets := r.ifaceNameToL2Targets[ifaceName]
3.2.1.1 arp 表 和 fdb 表
- ARP表:IP和MAC的对应关系;
- FDB表:MAC+VLAN和PORT的对应关系
ARP是三层转发,FDB是用于二层转发。两个设备不在一个网段或者没配IP,只要两者之间的链路层是连通的,就可以通过FDB表进行数据的转发
FDB表的作用就在于告诉设备从某个端口出去就可以到某个目的MAC
// Build maps based on desired target state, used below to clean up
// stale entries. Each L2 target results in an ARP entry as well as
// a FDB entry.
expectedARPEntries := map[string]net.HardwareAddr{}
expectedFDBEntries := map[string]net.HardwareAddr{}
for _, target := range expectedTargets {
expectedARPEntries[target.GW.String()] = target.VTEPMAC
expectedFDBEntries[target.IP.String()] = target.VTEPMAC
}
3.2.1.2 查看 arp 地址表,相当于 ip neighbor show
# ip neighbor show
192.168.1.1 dev enp0s3 lladdr f4:b8:a7:4b:57:da STALE
192.168.1.6 dev enp0s3 lladdr 74:c6:3b:61:51:fb REACHABLE
fe80::1 dev enp0s3 lladdr f4:b8:a7:4b:57:da router STALE
// Get the current set of neighbors on this interface.
existingNeigh, err := netlink.NeighList(linkAttrs.Index, netlink.FAMILY_V4)
if err != nil {
return err
}
3.2.1.3 对于所有的 arp 表,如果不再期待中,则删除,调用命令为 ip neighbor del
// For each existing neighbor, if it is not present in the expected set, then remove it.
var updatesFailed bool
for _, existing := range existingNeigh {
if existing.Family == syscall.AF_BRIDGE {
// FDB entries have family set to bridge.
if _, ok := expectedFDBEntries[existing.IP.String()]; !ok {
logCxt.WithField("neighbor", existing).Info("Removing old neighbor entry (FDB)")
if err := netlink.NeighDel(&existing); err != nil {
updatesFailed = true
continue
}
}
} else {
if _, ok := expectedARPEntries[existing.IP.String()]; !ok {
logCxt.WithField("neighbor", existing).Info("Removing old neighbor entry (ARP)")
if err := netlink.NeighDel(&existing); err != nil {
updatesFailed = true
continue
}
}
}
}
3.3 syncRoutesForLink 同步设置 L3 路由
// Sync L3 routes.
err := r.syncRoutesForLink(ifaceName)
if err == IfaceNotPresent {
logCxt.Info("Interface missing, will retry if it appears.")
break
} else if err == IfaceDown {
logCxt.Info("Interface down, will retry if it goes up.")
break
} else if err == IfaceGrace {
logCxt.Info("Interface in cleanup grace period, will retry after.")
graceIfaces++
return nil
} else if err != nil {
logCxt.WithError(err).Warn("Failed to syncronise routes.")
retries--
continue
}
3.3.1 syncRoutesForLink 函数
inGracePeriod 主要用于优雅的删除,不是立即删除,给一段时间进行清理工作
func (r *RouteTable) syncRoutesForLink(ifaceName string) error {
startTime := time.Now()
defer func() {
perIfaceSyncTime.Observe(time.Since(startTime).Seconds())
}()
logCxt := r.logCxt.WithField("ifaceName", ifaceName)
logCxt.Debug("Syncing interface routes")
// In order to allow Calico to run without Felix in an emergency, the CNI plugin pre-adds
// the route to the interface. To avoid flapping the route when Felix sees the interface
// before learning about the endpoint, we give each interface a grace period after we first
// see it before we remove routes that we're not expecting. Check whether the grace period
// applies to this interface.
inGracePeriod := r.time.Since(r.ifaceNameToFirstSeen[ifaceName]) < cleanupGracePeriod
leaveDirty := false
3.3.3.1 列出所有路由
// Got the link; try to sync its routes. Note: We used to check if the interface
// was oper down before we tried to do the sync but that prevented us from removing
// routes from an interface in some corner cases (such as being admin up but oper
// down).
linkAttrs := link.Attrs()
oldRoutes, err := nl.RouteList(link, r.netlinkFamily)
3.3.3.2 对于旧的路由,如果在期待加入的路由中无需处理,如果优雅删除期间则暂时先不清理,删除一些其他路由
for _, route := range oldRoutes {
seenCIDRs.Add(dest)
if expectedCIDRs.Contains(dest) {
logCxt.Debug("Syncing routes: Found expected route.")
continue
}
if !r.vxlan && inGracePeriod {
continue
}
logCxt.Info("Syncing routes: removing old route.")
}
3.3.3.3 对于期待的不在 seenCIDRs 中则调用 Route add 添加路由
for _, target := range expectedTargets {
cidr := target.CIDR
if !seenCIDRs.Contains(cidr) {
logCxt := logCxt.WithField("targetCIDR", target.CIDR)
logCxt.Info("Syncing routes: adding new route.")
ipNet := cidr.ToIPNet()
route := netlink.Route{
LinkIndex: linkAttrs.Index,
Dst: &ipNet,
Type: syscall.RTN_UNICAST,
Protocol: syscall.RTPROT_BOOT,
Scope: netlink.SCOPE_LINK,
}
if target.GW != nil {
route.Gw = target.GW.AsNetIP()
}
if target.Type == TargetTypeVXLAN {
route.Scope = netlink.SCOPE_UNIVERSE
route.SetFlag(syscall.RTNH_F_ONLINK)
}
// In case this IP is being re-used, wait for any previous conntrack entry
// to be cleaned up. (No-op if there are no pending deletes.)
r.waitForPendingConntrackDeletion(cidr.Addr())
if err := nl.RouteAdd(&route); err != nil {
logCxt.WithError(err).Warn("Failed to add route")
updatesFailed = true
}
}
3.3.3.4 addStaticARPEntry
如果 ipv4 版本且目的存在 MAC 则添加静态 arp,arp -i eth0 -s 192.168.*** ff:ee:ee:ee:ee:ee 如下:
# arp -a
? (192.168.72.134) at d4:bb:c8:a6:60:c1 [ether] on enp0s3
? (192.168.72.125) at <incomplete> on enp0s3
? (192.168.73.122) at dc:72:9b:42:02:b9 [ether] on enp0s3
gateway (192.168.72.1) at 3c:15:fb:09:8b:8c [ether] on enp0s3
? (192.168.72.140) at fc:2a:9c:ec:63:77 [ether] on enp0s3
? (192.170.77.147) at 7a:ae:8c:b8:09:e9 [ether] on cali691e7cb552c
? (192.168.74.100) at 20:ab:37:5e:82:5b [ether] on enp0s3
? (192.168.72.132) at 5c:03:39:9d:c8:c3 [ether] on enp0s3
? (192.168.73.64) at 74:c6:3b:61:51:fb [ether] on enp0s3
? (192.168.73.100) at f8:38:80:3e:b9:aa [ether] on enp0s3
? (192.168.73.232) at 84:be:52:ae:f1:fc [ether] on enp0s3
? (192.168.72.62) at c8:14:51:4e:39:66 [ether] on enp0s3
? (192.168.73.50) at 4c:32:75:08:84:34 [ether] on enp0s3
? (192.168.72.212) at 34:7c:25:53:db:45 [ether] on enp0s3
? (192.170.77.149) at 7e:24:34:c6:10:99 [ether] on calif4ae9085e4a
? (192.170.77.148) at 7a:cf:86:10:b6:33 [ether] on cali8bbfee2ad9e
if r.ipVersion == 4 && target.DestMAC != nil {
// TODO(smc) clean up/sync old ARP entries
err := r.addStaticARPEntry(cidr, target.DestMAC, ifaceName)
if err != nil {
logCxt.WithError(err).Warn("Failed to set ARP entry")
updatesFailed = true
}
}
4. 主要由更新 iface 或者 iface addr 变化时调用,存入 pendingIfaceNameToTargets
func (r *RouteTable) SetRoutes(ifaceName string, targets []Target) {
r.pendingIfaceNameToTargets[ifaceName] = targets
r.dirtyIfaces.Add(ifaceName)
}
func (r *RouteTable) SetL2Routes(ifaceName string, targets []L2Target) {
r.pendingIfaceNameToL2Targets[ifaceName] = targets
r.dirtyIfaces.Add(ifaceName)
}
总结:
本文分析的简单,主要是根据 iface,l2 或者 l3 变更路由