Contiv是思科的容器网络组件,是Docker首个认证的容器网络产品,编程语言为Go。
主要特点
Contiv的控制面主要由Netmaster和Netplugin两大组件组成;数据转发面支持Openvswitch和VPP(Vector Packet Processor);数据库支持etcd和consul。Contiv的强大能力可以用“3个Any”来表达:
- Any Infrastructure
- Any Platform
- Any Network
换个形式来展现:
部署架构
Contiv在K8s中的部署架构如下图(本文以笔者将Contiv应用于K8s为背景)。Netmaster以三副本高可用方式部署,Netplugin以Daemonset方式部署,ovs以daemonset方式部署,数据为etcd(三节点高可用部署)。
网络架构
Contiv网络采用SDN思想构建,控制器由多个ofnetMaster副本和每个节点驻留的ofnetAgent组成。ofnet和openvswitch之间根据openflow协议对话,进行策略下发等操作。
工作原理
数据库(etcd)
几项关键配置:
配置EP:/contiv.io/state/eps
运行EP:/contiv.io/oper/eps
网络: /contiv.io/state/nets
服务: /contiv.io/service
几大场景的处理流程解析
初始化
- netmaster
1. 初始化数据库驱动(etcd)
2. Service注册,注册 netmaster 和 netmaster.rpc 服务
3. 发现所有netplugin,构建 ofnetMaster <--> ofnetAgent网络模型
4. 运行状态机,获取Leader锁,迁移到leader状态(其他为follower)
Leader:
1). 启动Controller、配置路由(支持前端配置,netctl命令行、UI)
2). 配置netplugin请求路由(创建endpoint等)
3). 初始化策略管理器,从数据库中恢复网络策略
4). 启动服务
Follower:
1). 启动服务
- netplugin
1. 初始化数据库驱动(etcd)
2. 初始化数据转发驱动(ovs)
3. 根据数据库中的记录恢复所有本节点资源,包括network、endpoint、policy等
4. 发现所有netmaster和所有其他netplugin(主要为配置vxlan隧道);配置CNI请求路由(创建Pod、删除Pod)
5. 启动服务
创建网络
1. netmaster接收网络指令并初始化(主要是subnet,bitmap方法),写入etcd;
2. etcd通知netplugin处理网络创建事件,两类网络的处理有所不同:
1). infra网络。每个node在主机上创建一个接口,接口名是网络名,并分配IP地址;
2). data网络。主要是配置到ovs,比如vlan信息。
K8s创建Pod
1. 收到kubelet创建Pod请求后,netplugin通过http请求向主netmaster申请endpoint;
2. netmaster创建endpoint、分配IP地址,保存到etcd;
3. netplugin得到endpoint后,通过比对/contiv.io/state/eps和/contiv.io/oper/eps判定是新建endpoint后,交由ovs处理;
4. ovs生成veth接口(infra网络接口则是internal类型)并分别挂接到容器和ovs网桥;同时发送RPC请求给netmaster,后者广播给所有其他netplugin,触发所有netplugin更新本地流表(目的:arp代理、网络策略);保存到etcd /contiv.io/oper/eps;
5. 为容器配置IP和路由。
K8s删除Pod
1. 收到kubelet删除Pod请求后,netplugin首先向K8s请求Pod标签,包括租户、网络、组(目的是为了匹配到etcd中的endpoint,endpoint的命名规则:network.tenant-pauseContainerID);
2. ovs删除接口、删除etcd中的endpoint记录;发送RPC请求给netmaster,后者广播给所有其他netplugin,触发所有netplugin更新本地流表;
3. 请求netmaster删除endpoint;
支持K8s Service
1. 每个netplugin监听K8s Service和endpoint,配置到ofnetAgent(它是ovs的Controller),并下发控制规则给ovs,添加在openflow流表中;
2. Ovs驱动收到目的地址为ServiceIP的报文后,上送到Controller,后者根据报文的源IP地址选择Provider(EP),下发双向流表规则到ovs(即发送方向的DNAT和接收方向的SNAT规则)。
配置管理
netctl
[root@test]# netctl net ls -a
Tenant Network Nw Type Encap type Packet tag Subnet Gateway IPv6Subnet IPv6Gateway Cfgd Tag
------ ------- ------- ---------- ---------- ------- ------ ---------- ----------- ---------
data test001 data vlan 983 10.83.52.101-10.83.53.253/23 10.83.53.254
default contivgw1 infra vlan 980 10.80.52.1-10.80.52.100/23
t12345678 testabc data vlan 989 10.89.52.0/23 10.89.53.254
t12345678 testccc data vlan 981 10.81.52.101-10.81.53.253/23 10.81.53.254
WebUI(https://netmasterip:10000)
etcdctl
[root@test]# etcdctl --endpoints=http://etcdip:6666 ls /contiv.io/
/contiv.io/lock
/contiv.io/state
/contiv.io/oper
/contiv.io/master
/contiv.io/obj
/contiv.io/service
多租户网络用法
- 创建租户
- 创建网络
- 创建endpointGroup(epg)(可选)
- 在Pod labels中指定租户和网络(和epg).
apiVersion: v1
kind: ReplicationController
metadata:
name: test001
spec:
replicas: 2
template:
metadata:
name: webapp
labels:
app: webapp
io.contiv.net-group: epg001
io.contiv.network: net001
io.contiv.tenant: tenant001
代码解析
- netmaster初始化(代码分布:netmaster\main.go, netmaster\daemon\daemon.go)
// 开始netmaster初始化
func startNetMaster(netmaster *daemon.MasterDaemon) {
// initialize master daemon
netmaster.Init()
// start monitoring services
netmaster.InitServices()
// Run daemon FSM
netmaster.RunMasterFsm()
}
//注册服务。可以通过etcdctl --endpoints=http://etcdip:6666 ls /contiv.io/service/netmaster.rpc查看。
func (d *MasterDaemon) registerService() {
var err error
ctrlURL := strings.Split(d.ControlURL, ":")
masterIP := ctrlURL[0]
masterPort, _ := strconv.Atoi(ctrlURL[1])
// service info
srvInfo := objdb.ServiceInfo{
ServiceName: "netmaster",
TTL: 10,
HostAddr: masterIP,
Port: masterPort,
Role: d.currState,
}
...
}
// 运行状态机。agentDiscoveryLoop()中得到所有已注册的netplugin;获取leaderLock,变成Leader。在runLeader()中做数据恢复并启动服务。
func (d *MasterDaemon) RunMasterFsm() {
var err error
masterURL := strings.Split(d.ControlURL, ":")
masterIP, masterPort := masterURL[0], masterURL[1]
if len(masterURL) != 2 {
log.Fatalf("Invalid netmaster URL")
}
// create new ofnet master
d.ofnetMaster = ofnet.NewOfnetMaster(masterIP, ofnet.OFNET_MASTER_PORT)
if d.ofnetMaster == nil {
log.Fatalf("Error creating ofnet master")
}
// Register all existing netplugins in the background
go d.agentDiscoveryLoop()
// Create the lock
leaderLock, err = d.objdbClient.NewLock("netmaster/leader", masterIP+":"+masterPort, leaderLockTTL)
if err != nil {
log.Fatalf("Could not create leader lock. Err: %v", err)
}
// Try to acquire the lock
err = leaderLock.Acquire(0)
if err != nil {
// We dont expect any error during acquire.
log.Fatalf("Error while acquiring lock. Err: %v", err)
}
...
}
- netplugin初始化(代码分布:netplugin\netd.go,netplugin\agent\agent.go,netplugin\cluster\cluster.go)
// 开始启动netplugin
func startNetPlugin(pluginConfig *plugin.Config) {
// Create a new agent
ag := agent.NewAgent(pluginConfig)
// Process all current state
ag.ProcessCurrentState()
// post initialization processing
ag.PostInit()
// handle events
if err := ag.HandleEvents(); err != nil {
logrus.Errorf("Netplugin exiting due to error: %v", err)
os.Exit(1)
}
}
// New一个agent后,将初始化一个plugin,在这里初始化数据库(etcd)和数据面驱动(ovs);ovs将在ovsdriver.go(netplugin\drivers\ovsd\ovsdriver.go)中创建ovs桥以及ofnetAgent(netplugin\vender\github.com\contiv\ofnet\ofnetAgent.go)
func (p *NetPlugin) Init(pluginConfig Config) error {
var err error
if pluginConfig.Instance.HostLabel == "" {
return core.Errorf("empty host-label passed")
}
// initialize state driver
p.StateDriver, err = utils.GetStateDriver()
if err != nil {
p.StateDriver, err = utils.NewStateDriver(pluginConfig.Drivers.State, &pluginConfig.Instance)
if err != nil {
return err
}
}
defer func() {
if err != nil {
utils.ReleaseStateDriver()
}
}()
// set state driver in instance info
pluginConfig.Instance.StateDriver = p.StateDriver
err = InitGlobalSettings(p.StateDriver, &pluginConfig.Instance)
if err != nil {
return err
}
// initialize network driver
p.NetworkDriver, err = utils.NewNetworkDriver(pluginConfig.Drivers.Network, &pluginConfig.Instance)
if err != nil {
return err
}
p.PluginConfig = pluginConfig
defer func() {
if err != nil {
p.NetworkDriver.Deinit()
}
}()
return nil
}
// 通过读取etcd中的数据,恢复网络和endpoint等
func (ag *Agent) ProcessCurrentState() error {
opts := ag.pluginConfig.Instance
readNet := &mastercfg.CfgNetworkState{}
readNet.StateDriver = ag.netPlugin.StateDriver
netCfgs, err := readNet.ReadAll()
if err == nil {
for idx, netCfg := range netCfgs {
net := netCfg.(*mastercfg.CfgNetworkState)
log.Debugf("read net key[%d] %s, populating state \n", idx, net.ID)
processNetEvent(ag.netPlugin, net, false, opts)
if net.NwType == "infra" {
processInfraNwCreate(ag.netPlugin, net, opts)
}
}
}
...
}
// 注册netplugin服务;结合netmaster初始化发现netplugin看。
func RunLoop(netplugin *plugin.NetPlugin, ctrlIP, vtepIP, hostname string) error {
// Register ourselves
err := registerService(ObjdbClient, ctrlIP, vtepIP, hostname, netplugin.PluginConfig.Instance.VxlanUDPPort)
// Start peer discovery loop
go peerDiscoveryLoop(netplugin, ObjdbClient, ctrlIP, vtepIP)
return err
}
- pod创建和删除(代码分布:netplugin\mgmtfn\k8splugin\driver.go)
func addPod(w http.ResponseWriter, r *http.Request, vars map[string]string) (interface{}, error) {
resp := cniapi.RspAddPod{}
logEvent("add pod")
content, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Errorf("Failed to read request: %v", err)
return resp, err
}
...
}
func deletePod(w http.ResponseWriter, r *http.Request, vars map[string]string) (interface{}, error) {
resp := cniapi.RspAddPod{}
logEvent("del pod")
content, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Errorf("Failed to read request: %v", err)
return resp, err
}
...
}
// 上面2个接口注册在哪里呢?
func InitCNIServer(netplugin *plugin.NetPlugin) error {
netPlugin = netplugin
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("Could not retrieve hostname: %v", err)
}
pluginHost = hostname
// Set up the api client instance
kubeAPIClient = setUpAPIClient()
if kubeAPIClient == nil {
log.Fatalf("Could not init kubernetes API client")
}
log.Debugf("Configuring router")
router := mux.NewRouter()
// register handlers for cni
t := router.Headers("Content-Type", "application/json").Methods("POST").Subrouter()
t.HandleFunc(cniapi.EPAddURL, utils.MakeHTTPHandler(addPod))
t.HandleFunc(cniapi.EPDelURL, utils.MakeHTTPHandler(deletePod))
t.HandleFunc("/ContivCNI.{*}", utils.UnknownAction)
...
}
- ofnetMaster和ofnetAgent的协作(代码分布:netplugin\vender\github.com\contiv\ofnet\ofnetMaster.go,netplugin\vender\github.com\contiv\ofnet\ofnetAgent.go)
// ofnetAgent添加netmaster,构建上图中ofnetMaster和ofnetAgent的网络结构
func (self *OfnetAgent) AddMaster(masterInfo *OfnetNode, ret *bool) error {
master := new(OfnetNode)
master.HostAddr = masterInfo.HostAddr
master.HostPort = masterInfo.HostPort
var resp bool
log.Infof("Adding master: %+v", *master)
masterKey := fmt.Sprintf("%s:%d", masterInfo.HostAddr, masterInfo.HostPort)
// Save it in DB
self.masterDbMutex.Lock()
self.masterDb[masterKey] = master
self.masterDbMutex.Unlock()
// increment stats
self.incrStats("MasterAdd")
// My info to send to master
myInfo := new(OfnetNode)
myInfo.HostAddr = self.MyAddr
myInfo.HostPort = self.MyPort
// Register the agent with the master
err := rpcHub.Client(master.HostAddr, master.HostPort).Call("OfnetMaster.RegisterNode", &myInfo, &resp)
if err != nil {
log.Errorf("Failed to register with the master %+v. Err: %v", master, err)
// increment stats
self.incrErrStats("RegisterNodeFailure")
return err
}
...
}
// ofnetAgent添加本地endpoint(本节点pod申请下来的endpoint);会发 rcp请求给ofnetMaster,由后者广播给所有其他ofnetAgent。
func (self *OfnetAgent) AddLocalEndpoint(endpoint EndpointInfo) error {
// Call the datapath
err := self.datapath.AddLocalEndpoint(*epreg)
if err != nil {
log.Errorf("Adding endpoint (%+v) to datapath. Err: %v", epreg, err)
return err
}
// Add the endpoint to local routing table
self.endpointDb.Set(epId, epreg)
self.localEndpointDb.Set(string(endpoint.PortNo), epreg)
// Send the endpoint to all known masters
self.masterDbMutex.Lock()
for _, master := range self.masterDb {
var resp bool
log.Infof("Sending endpoint %+v to master %+v", epreg, master)
// Make the RPC call to add the endpoint to master
err := rpcHub.Client(master.HostAddr, master.HostPort).Call("OfnetMaster.EndpointAdd", epreg, &resp)
if err != nil {
log.Errorf("Failed to add endpoint %+v to master %+v. Err: %v", epreg, master, err)
// Continue sending the message to other masters.
} else {
// increment stats
self.incrStats("EndpointAddSent")
}
}
...
}
// ofnetMaster注册node,构建上图中ofnetMaster和ofnetAgent的网络结构
func (self *OfnetMaster) RegisterNode(hostInfo *OfnetNode, ret *bool) error {
// Create a node
node := new(OfnetNode)
node.HostAddr = hostInfo.HostAddr
node.HostPort = hostInfo.HostPort
hostKey := fmt.Sprintf("%s:%d", hostInfo.HostAddr, hostInfo.HostPort)
// Add it to DB
self.masterMutex.Lock()
self.agentDb[hostKey] = node
self.masterMutex.Unlock()
log.Infof("Registered node: %+v", node)
// take a read lock for accessing db
self.masterMutex.RLock()
defer self.masterMutex.RUnlock()
// Send all existing endpoints to the new node
for _, endpoint := range self.endpointDb {
if node.HostAddr != endpoint.OriginatorIp.String() {
var resp bool
log.Infof("Sending endpoint: %+v to node %s:%d", endpoint, node.HostAddr, node.HostPort)
client := rpcHub.Client(node.HostAddr, node.HostPort)
err := client.Call("OfnetAgent.EndpointAdd", endpoint, &resp)
if err != nil {
log.Errorf("Error adding endpoint to %s. Err: %v", node.HostAddr, err)
// continue sending other endpoints
}
}
}
...
}
// 收到ofnetAgent的endpoint创建请求,广播给所有其他ofnetAgent
func (self *OfnetMaster) EndpointAdd(ep *OfnetEndpoint, ret *bool) error {
log.Infof("Received Endpoint CReate from Remote netplugin")
// Check if we have the endpoint already and which is more recent
self.masterMutex.RLock()
oldEp := self.endpointDb[ep.EndpointID]
self.masterMutex.RUnlock()
if oldEp != nil {
// If old endpoint has more recent timestamp, nothing to do
if !ep.Timestamp.After(oldEp.Timestamp) {
return nil
}
}
// Save the endpoint in DB
self.masterMutex.Lock()
self.endpointDb[ep.EndpointID] = ep
self.masterMutex.Unlock()
// take a read lock for accessing db
self.masterMutex.RLock()
defer self.masterMutex.RUnlock()
// Publish it to all agents except where it came from
for nodeKey, node := range self.agentDb {
if node.HostAddr != ep.OriginatorIp.String() {
var resp bool
log.Infof("Sending endpoint: %+v to node %s:%d", ep, node.HostAddr, node.HostPort)
client := rpcHub.Client(node.HostAddr, node.HostPort)
err := client.Call("OfnetAgent.EndpointAdd", ep, &resp)
...
}
- 对K8s Service的支持(代码分布:netplugin\mgmtfn\k8splugin\cniserver.go以及数据面netplugin\vendor\github.com\contiv\ofnet\ofnetSvcProxy.go)
func InitKubServiceWatch(np *plugin.NetPlugin) {
watchClient := setUpAPIClient()
if watchClient == nil {
log.Fatalf("Could not init kubernetes API client")
}
svcCh := make(chan SvcWatchResp, 1)
epCh := make(chan EpWatchResp, 1)
go func() {
for {
select {
case svcEvent := <-svcCh:
switch svcEvent.opcode {
case "WARN":
log.Debugf("svcWatch : %s", svcEvent.errStr)
break
case "FATAL":
log.Errorf("svcWatch : %s", svcEvent.errStr)
break
case "ERROR":
log.Warnf("svcWatch : %s", svcEvent.errStr)
watchClient.WatchServices(svcCh)
...
}
func (proxy *ServiceProxy) addProvider(svcIP, provIP string) error {
}
func (proxy *ServiceProxy) addService(svcName string) error {
}
func (proxy *ServiceProxy) ProviderUpdate(svcName string, providers []string) {
}
// ovs收到发送给Service的报文上送到ofnet的入口。调研方在ovs桥报文接收端,比如vlanBridge.go->PacketRcvd()
func (proxy *ServiceProxy) HandlePkt(pkt *ofctrl.PacketIn) {
if pkt.TableId != SRV_PROXY_DNAT_TBL_ID {
return // ignore other packets
}
if pkt.Data.Ethertype != protocol.IPv4_MSG {
return // ignore non-IP pkts
}
if pkt.Data.HWSrc.String() == "00:00:11:11:11:11" {
log.Warnf("Pkt received with our src mac. Loop???")
return // pkt we sent
}
ip := pkt.Data.Data.(*protocol.IPv4)
svcIP := ip.NWDst.String()
log.Infof("HandlePkt svcIP: %s", svcIP)
proxy.oMutex.Lock()
defer proxy.oMutex.Unlock()
operEntry, found := proxy.operState[svcIP]
if !found {
return // this means service was just deleted
}
clientIP := ip.NWSrc.String()
provIP, err := operEntry.allocateProvider(clientIP)
if err != nil {
log.Warnf("allocateProvider failed for %s - %v", svcIP, err)
return
}
...
}
缺陷与不足
- 1.2.1的ovs版本是2.5.2,不支持有状态流表,所以对K8s service的支持不算完整,它无法支持Pod同时访问Service地址和endpoint地址
- ovs转发策略决定了报文无法从in_port转发出去,所以无法支持一些胖容器场景
- 不支持强制删除Pod
- 1.2.1版本infra网络接口在服务器重启后MAC地址未恢复
- stats_exporter中处理无地址分配的网络存在Bug
参考资料
https://contiv.io/documents/community/index.html
https://blogs.cisco.com/
https://github.com/contiv