版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhonglinzhang/article/details/83028925
主要是调用runtime,这里默认为docker
0. 数据流
NewMainKubelet(cmd/kubelet/app/server.go) ->
NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime_manager.go) ->
- syncPod(pkg/kubelet/kubelet.go) ->
- SyncPod(pkg/kubelet/kuberuntime/kuberuntime_manager.go)
1. 数据结构
1.1 接口ContainerManager
管理运行在宿主机上的container,接口定义比较清晰
// Manages the containers running on a machine.
type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error
// SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services.
SystemCgroupsLimit() v1.ResourceList
// GetNodeConfig returns a NodeConfig that is being used by the container manager.
GetNodeConfig() NodeConfig
// Status returns internal Status.
Status() Status
// NewPodContainerManager is a factory method which returns a podContainerManager object
// Returns a noop implementation if qos cgroup hierarchy is not enabled
NewPodContainerManager() PodContainerManager
// GetMountedSubsystems returns the mounted cgroup subsystems on the node
GetMountedSubsystems() *CgroupSubsystems
// GetQOSContainersInfo returns the names of top level QoS containers
GetQOSContainersInfo() QOSContainersInfo
// GetNodeAllocatableReservation returns the amount of compute resources that have to be reserved from scheduling.
GetNodeAllocatableReservation() v1.ResourceList
// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
GetCapacity() v1.ResourceList
// GetDevicePluginResourceCapacity returns the node capacity (amount of total device plugin resources),
// node allocatable (amount of total healthy resources reported by device plugin),
// and inactive device plugin resources previously registered on the node.
GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string)
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error
// GetResources returns RunContainerOptions with devices, mounts, and env fields populated for
// extended resources required by container.
GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)
// UpdatePluginResources calls Allocate of device plugin handler for potential
// requests for device plugin resources, and returns an error if fails.
// Otherwise, it updates allocatableResource in nodeInfo if necessary,
// to make sure it is at least equal to the pod's requested capacity for
// any registered device plugin resource
UpdatePluginResources(*schedulercache.NodeInfo, *lifecycle.PodAdmitAttributes) error
InternalContainerLifecycle() InternalContainerLifecycle
// GetPodCgroupRoot returns the cgroup which contains all pods.
GetPodCgroupRoot() string
// GetPluginRegistrationHandler returns a plugin registration handler
// The pluginwatcher's Handlers allow to have a single module for handling
// registration.
GetPluginRegistrationHandler() pluginwatcher.PluginHandler
}
2. NewMainKubelet函数
runtime初始化,调用NewKubeGenericRuntimeManager函数
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
seccompProfileRoot,
containerRefManager,
machineInfo,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod,
runtimeService,
imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
legacyLogProvider,
klet.runtimeClassManager,
)
2.1 可以看到containerRuntime为runtime,实现了接口
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
3. syncPod函数
路径pkg/kubelet/kubelt.go
吧啦吧啦初始化一大堆,最终调用SyncPod,根据klet.containerRuntime = runtime,可以得到kubeGenericRuntimeManager实现了
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
// Do not return error if the only failures were pods in backoff
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime so we get better errors
return err
}
}
return nil
}
4. SyncPod函数
路径pkg/kuberuntime/kuberuntime_manager.go
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create init containers.
// 6. Create normal containers.
4.1 Step 1: Compute sandbox and container changes
确定哪些容器要创建,哪些容器要删除,需要删除的放入podContainerChanges.ContainersToKill,需要创建的放入podContainerChanges.ContainersToStart
podContainerChanges := m.computePodActions(pod, podStatus)
glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
}
}
4.2 Step 2: Kill the pod if the sandbox has changed
当沙箱变化的时候,需要重新创建pod
if podContainerChanges.KillPod {
if !podContainerChanges.CreateSandbox {
glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
} else {
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
}
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
}
4.3 Step 3: kill any running containers in this pod which are not to keep
删除不需要的pod下的容器
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
return
}
}
}
4.4 Step 4: Create a sandbox for the pod if necessary
创建sandbox(容器标准),調用createPodSandbox,創建pod配置,創建pod log目錄,調用m.runtimeService.RunPodSandbox
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
var msg string
var err error
glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox: %v", err)
return
}
glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if err != nil {
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
result.Fail(err)
return
}
// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
}
}
4.5 Step 5: start the init container
init是为容器做初始化工作的,
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
return
}
glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
startContainerResult.Fail(err, msg)
utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
return
}
// Successfully started the container; clear the entry in the failure
glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}
4.6 Step 6: start containers in podContainerChanges.ContainersToStart
真是真正启动容器进程,调用startContainer第五章节讲解
// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
continue
}
glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
glog.V(3).Infof("container start failed: %v: %s", err, msg)
default:
utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
}
continue
}
}
5. startContainer函数
路径:pkg/kubelet/kuberuntime/kuberuntime_container.go
5.1 Step 1: pull the image
一看就明白,拉去镜像
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
if err != nil {
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return msg, err
}
5.2 Step 2: create the container
create container,最终调用API使用GRPC连接,相当于docker create操作
// Step 2: create the container.
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
}
glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
// For a new container, the RestartCount should be 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
}
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
if cleanupAction != nil {
defer cleanupAction()
}
if err != nil {
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), ErrCreateContainerConfig
}
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), ErrCreateContainer
}
5.3 Step 3: start the container
相当于docker start这种操作
// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)
if err != nil {
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")
// Symlink container logs to the legacy container log location for cluster logging
// support.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
// Because if containerLog path does not exist, only dandling legacySymlink is created.
// This dangling legacySymlink is later removed by container gc, so it does not make sense
// to create it in the first place. it happens when journald logging driver is used with docker.
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
legacySymlink, containerID, containerLog, err)
}
}