源码路径: github.com/projectcalico/felix/dataplane/linux/endpoint_mgr.go
结构体 endpointManager
wlIfaceRegexp:calico iface 正则表达式,^cali.*
kubeIPVSSupportEnabled:根据网卡为
// endpointManager manages the dataplane resources that belong to each endpoint as well as // the "dispatch chains" that fan out packets to the right per-endpoint chain. // // It programs the relevant iptables chains (via the iptables.Table objects) along with // per-endpoint routes (via the RouteTable). // // Since calculating the dispatch chains is fairly expensive, the main OnUpdate method // simply records the pending state of each interface and defers the actual calculation // to CompleteDeferredWork(). This is also the basis of our failure handling; updates // that fail are left in the pending state so they can be retried later. type endpointManager struct { // Config. ipVersion uint8 wlIfacesRegexp *regexp.Regexp kubeIPVSSupportEnabled bool
NewIntDataplaneDriver
--> newEndpointManager
1. newEndpointManager 函数
实例化 endpointManager 结构,包括 raw mangle filter 等 iptables 表,callbacks 为 newCallbacks 实例化的 ,包括 policy members ipset interface hostendpoint workloadendpoint 等
func newEndpointManager(
rawTable iptablesTable,
mangleTable iptablesTable,
filterTable iptablesTable,
ruleRenderer rules.RuleRenderer,
routeTable routeTable,
ipVersion uint8,
epMarkMapper rules.EndpointMarkMapper,
kubeIPVSSupportEnabled bool,
wlInterfacePrefixes []string,
onWorkloadEndpointStatusUpdate EndpointStatusUpdateCallback,
callbacks *callbacks,
) *endpointManager {
return newEndpointManagerWithShims(
rawTable,
mangleTable,
filterTable,
ruleRenderer,
routeTable,
ipVersion,
epMarkMapper,
kubeIPVSSupportEnabled,
wlInterfacePrefixes,
onWorkloadEndpointStatusUpdate,
writeProcSys,
callbacks,
)
}
loopUpdatingDataplane
--> processMsgFromCalcGraph
--> mgr.OnUpdate
--> processIfaceUpdate
--> mgr.OnUpdate
2. OnUpdate 函数
类型为 workloadEndpoint update 或者 remove 操作,pendingWlpUpdates 记录待更新的消息
msg=id:<orchestrator_id:"k8s" workload_id:"default/kube-webhook-55fbb8c97f-2xxcw" endpoint_id:"eth0" > endpoint:<state:"active" name:"calie01333a0edc" profile_ids:"kns.default" profile_ids:"ksa.default.webhook-server-sa" ipv4_nets:"192.170.77.134/32" >
WorkloadEndpoint 定义在 github.com/projectcalico/felixix/ptoto/felixbackend.pb.go
type WorkloadEndpoint struct { State string `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Mac string `protobuf:"bytes,3,opt,name=mac,proto3" json:"mac,omitempty"` ProfileIds []string `protobuf:"bytes,4,rep,name=profile_ids,json=profileIds" json:"profile_ids,omitempty"` Ipv4Nets []string `protobuf:"bytes,5,rep,name=ipv4_nets,json=ipv4Nets" json:"ipv4_nets,omitempty"` Ipv6Nets []string `protobuf:"bytes,6,rep,name=ipv6_nets,json=ipv6Nets" json:"ipv6_nets,omitempty"` Tiers []*TierInfo `protobuf:"bytes,7,rep,name=tiers" json:"tiers,omitempty"` Ipv4Nat []*NatInfo `protobuf:"bytes,8,rep,name=ipv4_nat,json=ipv4Nat" json:"ipv4_nat,omitempty"` Ipv6Nat []*NatInfo `protobuf:"bytes,9,rep,name=ipv6_nat,json=ipv6Nat" json:"ipv6_nat,omitempty"` }
func (m *endpointManager) OnUpdate(protoBufMsg interface{}) {
log.WithField("msg", protoBufMsg).Debug("Received message")
switch msg := protoBufMsg.(type) {
case *proto.WorkloadEndpointUpdate:
m.pendingWlEpUpdates[*msg.Id] = msg.Endpoint
case *proto.WorkloadEndpointRemove:
m.pendingWlEpUpdates[*msg.Id] = nil
2.1 类型为 HostEndpoint 的 update 或者 remove 操作
case *proto.HostEndpointUpdate:
log.WithField("msg", msg).Debug("Host endpoint update")
m.callbacks.InvokeUpdateHostEndpoint(*msg.Id)
m.rawHostEndpoints[*msg.Id] = msg.Endpoint
m.hostEndpointsDirty = true
m.epIDsToUpdateStatus.Add(*msg.Id)
case *proto.HostEndpointRemove:
log.WithField("msg", msg).Debug("Host endpoint removed")
m.callbacks.InvokeRemoveHostEndpoint(*msg.Id)
delete(m.rawHostEndpoints, *msg.Id)
m.hostEndpointsDirty = true
m.epIDsToUpdateStatus.Add(*msg.Id)
2.2 类型为 iface 的 update 或者 地址更新操作
待更新的存入 pendingIfaceUpdates 中,如果接口地址更新则存入 hostIfaceToAddrs
case *ifaceUpdate:
log.WithField("update", msg).Debug("Interface state changed.")
m.pendingIfaceUpdates[msg.Name] = msg.State
case *ifaceAddrsUpdate:
log.WithField("update", msg).Debug("Interface addrs changed.")
if m.wlIfacesRegexp.MatchString(msg.Name) {
log.WithField("update", msg).Debug("Workload interface, ignoring.")
return
}
if msg.Addrs != nil {
m.hostIfaceToAddrs[msg.Name] = msg.Addrs
} else {
delete(m.hostIfaceToAddrs, msg.Name)
}
m.hostEndpointsDirty = true
}
loopUpdatingDataplane
--> CompleteDeferredWork.Apply
--> mgr.CompleteDeferredWork
3. CompleteDeferredWork 函数
遍历所有待更新的 iface,如果接口为启动 up 状态,则加入到 activeUpInfaces 集合中,如果接口为 cali-XXXXX 则加入到 wlIfaceNamesToReconfigure 集合中
func (m *endpointManager) CompleteDeferredWork() error {
// Copy the pending interface state to the active set and mark any interfaces that have
// changed state for reconfiguration by resolveWorkload/HostEndpoints()
for ifaceName, state := range m.pendingIfaceUpdates {
if state == ifacemonitor.StateUp {
m.activeUpIfaces.Add(ifaceName)
if m.wlIfacesRegexp.MatchString(ifaceName) {
log.WithField("ifaceName", ifaceName).Info(
"Workload interface came up, marking for reconfiguration.")
m.wlIfaceNamesToReconfigure.Add(ifaceName)
}
} else {
m.activeUpIfaces.Discard(ifaceName)
}
// If this interface is linked to any already-existing endpoints, mark the endpoint
// status for recalculation. If the matching endpoint changes when we do
// resolveHostEndpoints() then that will mark old and new matching endpoints for
// update.
m.markEndpointStatusDirtyByIface(ifaceName)
// Clean up as we go...
delete(m.pendingIfaceUpdates, ifaceName)
}
3.1 resolveWorkloadEndpoints 函数
func (m *endpointManager) resolveWorkloadEndpoints() {
if len(m.pendingWlEpUpdates) > 0 {
// We're about to make endpoint updates, make sure we recheck the dispatch chains.
m.needToCheckDispatchChains = true
}
3.1.1 循环遍历所有待更新的 workloadendpint,如果旧的与新的不一致,则更新 filter 规则链,route 表等
// Update any dirty endpoints.
for id, workload := range m.pendingWlEpUpdates {
logCxt := log.WithField("id", id)
oldWorkload := m.activeWlEndpoints[id]
if workload != nil {
logCxt.Info("Updating per-endpoint chains.")
if oldWorkload != nil && oldWorkload.Name != workload.Name {
logCxt.Debug("Interface name changed, cleaning up old state")
m.epMarkMapper.ReleaseEndpointMark(oldWorkload.Name)
m.filterTable.RemoveChains(m.activeWlIDToChains[id])
m.routeTable.SetRoutes(oldWorkload.Name, nil)
m.wlIfaceNamesToReconfigure.Discard(oldWorkload.Name)
delete(m.activeWlIfaceNameToID, oldWorkload.Name)
}
3.1.2 更新 filter 规则链
var ingressPolicyNames, egressPolicyNames []string
if len(workload.Tiers) > 0 {
ingressPolicyNames = workload.Tiers[0].IngressPolicies
egressPolicyNames = workload.Tiers[0].EgressPolicies
}
adminUp := workload.State == "active"
chains := m.ruleRenderer.WorkloadEndpointToIptablesChains(
workload.Name,
m.epMarkMapper,
adminUp,
ingressPolicyNames,
egressPolicyNames,
workload.ProfileIds,
)
m.filterTable.UpdateChains(chains)
m.activeWlIDToChains[id] = chains
3.1.3 更新路由表
SetRoutes 则是 L3 层路由,pendingIfaceNameToTargets 存入待更新的路由
if adminUp {
logCxt.Debug("Endpoint up, adding routes")
for _, s := range ipStrings {
routeTargets = append(routeTargets, routetable.Target{
CIDR: ip.MustParseCIDROrIP(s),
DestMAC: mac,
})
}
} else {
logCxt.Debug("Endpoint down, removing routes")
}
m.routeTable.SetRoutes(workload.Name, routeTargets)
m.wlIfaceNamesToReconfigure.Add(workload.Name)
m.activeWlEndpoints[id] = workload
m.activeWlIfaceNameToID[workload.Name] = id
delete(m.pendingWlEpUpdates, id)
3.1.4 configureInterface 函数
/proc/sys/net/ipv4/conf/%s/rp_filter 写入 1,开启严格的反向路径校验。对每个进来的数据包,校验其反向路径是否是最佳路径。如果反向路径不是最佳路径,则直接丢弃该数据包。
/proc/sys/net/ipv4/conf/%s/route_localnet 写入 1,默认系统不转发127.0.0.1,需要配置规则才能起作用
/proc/sys/net/ipv4/neigh/%s/proxy_delay 写入 0,当接收到有一个arp请求时,在回应前可以延迟的时间,这个请求是要得到一个已知代理arp项的地址.
/proc/sys/net/ipv4/conf/%s/proxy_arp 写入 1,打开arp代理功能
/proc/sys/net/ipv4/conf/%s/forwarding 写入 1,在 iface 开启数据包转发功能
func (m *endpointManager) configureInterface(name string) error {
if !m.activeUpIfaces.Contains(name) {
log.WithField("ifaceName", name).Info(
"Skipping configuration of interface because it is oper down.")
return nil
}
log.WithField("ifaceName", name).Info(
"Applying /proc/sys configuration to interface.")
4. resolveHostEndpoints 函数
泰特么长了这个函数,没有耐心看进去了