版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhonglinzhang/article/details/82800287
1、 interface结构体
(1.1) volumePlugin接口
- 主要是被kubelet调用来实例化或者管理volume
// VolumePlugin is an interface to volume plugins that can be used on a
// kubernetes node (e.g. by kubelet) to instantiate and manage volumes.
type VolumePlugin interface {
Init(host VolumeHost) error
GetPluginName() string
GetVolumeName(spec *Spec) (string, error)
CanSupport(spec *Spec) bool
RequiresRemount() bool
NewMounter(spec *Spec, podRef *v1.Pod, opts VolumeOptions) (Mounter, error)
NewUnmounter(name string, podUID types.UID) (Unmounter, error)
ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error)
SupportsMountOption() bool
SupportsBulkVolumeVerification() bool
}
2、 kubelet 启动注册volume plugins
- cmd/kubelet/app/server.go中run函数调用UnsecuredDependencies函数注册volume plugins,主要函数为ProbeVolumePlugins()
func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) {
return &kubelet.Dependencies{
Mounter: mounter,
NetworkPlugins: ProbeNetworkPlugins(s.CNIConfDir, s.CNIBinDir),
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: kubecontainer.RealOS{},
Writer: writer,
VolumePlugins: ProbeVolumePlugins(),
}
(2.1) ProbeVolumePlugins函数
- 注册到结构体为volume.volumePlugin列表,接口如1.1所示,主要讲解host_path,以及nfs
func ProbeVolumePlugins() []volume.VolumePlugin {
allPlugins := []volume.VolumePlugin{}
// The list of plugins to probe is decided by the kubelet binary, not
// by dynamic linking or other "magic". Plugins will be analyzed and
// initialized later.
//
// Kubelet does not currently need to configure volume plugins.
// If/when it does, see kube-controller-manager/app/plugins.go for example of using volume.VolumeConfig
allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(volume.VolumeConfig{})...)
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...)
allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, quobyte.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, cephfs.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, azure_file.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, photon_pd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
}
return allPlugins
}
(2.2) ProbeVolumePlugins函数
- 路径pkg/volume/host_path/host_path.go
- hostPathPlugin实现了volumePlugin接口
func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&hostPathPlugin{
host: nil,
config: volumeConfig,
},
}
}
type hostPathPlugin struct {
host volume.VolumeHost
config volume.VolumeConfig
}
3、 kubelet 初始化volume plugins
- pkg/kubelet/kubelet.go中NewMainKubelet函数中初始化volume plugin
- mountpod.NewMnager函数中kubelet的root dir为 /var/lib/kubelet,目录下面有文件plugin-containers,plugins,pods
- InitPlugins初始化注册的所有volune plugin(3.1讲解)
func NewInitializedVolumePluginMgr(
kubelet *Kubelet,
secretManager secret.Manager,
configMapManager configmap.Manager,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
mountPodManager, err := mountpod.NewManager(kubelet.getRootDir(), kubelet.podManager)
if err != nil {
return nil, err
}
kvh := &kubeletVolumeHost{
kubelet: kubelet,
volumePluginMgr: volume.VolumePluginMgr{},
secretManager: secretManager,
configMapManager: configMapManager,
mountPodManager: mountPodManager,
}
if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
return nil, fmt.Errorf(
"Could not initialize volume plugins for KubeletVolumePluginMgr: %v",
err)
}
return &kvh.volumePluginMgr, nil
}
(3.1) InitPlugins函数
- 路径pkg/volume/plugins.go
- 调用各个plugin的init初始化
func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPluginProber, host VolumeHost) error {
if pm.plugins == nil {
pm.plugins = map[string]VolumePlugin{}
}
allErrs := []error{}
for _, plugin := range plugins {
name := plugin.GetPluginName()
if errs := validation.IsQualifiedName(name); len(errs) != 0 {
allErrs = append(allErrs, fmt.Errorf("volume plugin has invalid name: %q: %s", name, strings.Join(errs, ";")))
continue
}
if _, found := pm.plugins[name]; found {
allErrs = append(allErrs, fmt.Errorf("volume plugin %q was registered more than once", name))
continue
}
err := plugin.Init(host)
if err != nil {
glog.Errorf("Failed to load volume plugin %s, error: %s", name, err.Error())
allErrs = append(allErrs, err)
continue
}
pm.plugins[name] = plugin
glog.V(1).Infof("Loaded volume plugin %q", name)
}
return utilerrors.NewAggregate(allErrs)
}
4、 startKubelet函数关于volume部分
- 同步apiserver的pod信息,新增、删除的pod对volume状态进行同步更新
- 启动服务,监听controller manager(controller manager可助kubelet管理volume)
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies) {
// start the kubelet
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
// start the kubelet server
if kubeCfg.EnableServer {
go wait.Until(func() {
k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}, 0, wait.NeverStop)
}
if kubeCfg.ReadOnlyPort > 0 {
go wait.Until(func() {
k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}, 0, wait.NeverStop)
}
}
(4.1)
- 调用volumeManager.Run于第五章节讲解
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
5、 VolumeManager 接口部分
(5.1)VolumeManager接口
- VolumeManager数据卷管理的核心接口
// VolumeManager runs a set of asynchronous loops that figure out which volumes
// need to be attached/mounted/unmounted/detached based on the pods scheduled on
// this node and makes it so.
type VolumeManager interface {
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
WaitForAttachAndMount(pod *v1.Pod) error
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
GetVolumesInUse() []v1.UniqueVolumeName
ReconcilerStatesHasBeenSynced() bool
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
(5.2)volumeManager结构体
- volumeManager结构体实现了VolumeManager接口,主要有两个需要注意:
- desiredStateOfWorld:预期状态,意思就是volume需要被attach,哪些pods引用这个volume
- actualStateOfWorld:实际状态,也就是volume已经被atttach哪个node,哪个pod引用这个volume
/ volumeManager implements the VolumeManager interface
type volumeManager struct {
// kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
// communicate with the API server to fetch PV and PVC objects
kubeClient clientset.Interface
// volumePluginMgr is the volume plugin manager used to access volume
// plugins. It must be pre-initialized.
volumePluginMgr *volume.VolumePluginMgr
// desiredStateOfWorld is a data structure containing the desired state of
// the world according to the volume manager: i.e. what volumes should be
// attached and which pods are referencing the volumes).
// The data structure is populated by the desired state of the world
// populator using the kubelet pod manager.
desiredStateOfWorld cache.DesiredStateOfWorld
// actualStateOfWorld is a data structure containing the actual state of
// the world according to the manager: i.e. which volumes are attached to
// this node and what pods the volumes are mounted to.
// The data structure is populated upon successful completion of attach,
// detach, mount, and unmount actions triggered by the reconciler.
actualStateOfWorld cache.ActualStateOfWorld
// operationExecutor is used to start asynchronous attach, detach, mount,
// and unmount operations.
operationExecutor operationexecutor.OperationExecutor
// reconciler runs an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering attach,
// detach, mount, and unmount operations using the operationExecutor.
reconciler reconciler.Reconciler
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the desiredStateOfWorld using the kubelet PodManager.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
}
(5.3)Run函数
- 路径pkg/kubelet/volumemanager/volume_manager.go
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
glog.V(2).Infof("The desired_state_of_world populator starts")
glog.Infof("Starting Kubelet Volume Manager")
go vm.reconciler.Run(stopCh)
<-stopCh
glog.Infof("Shutting down Kubelet Volume Manager")
}
(5.3.1)vm.desiredStateOfWorldPopulator.Run函数
- 路径pkg/kubelet/volumemanager/polulator/desired_state_of_world_polulator.go
- 主要调用polulatorLoopFunc函数
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady()
dswp.populatorLoopFunc()()
return done, nil
}, stopCh)
dswp.hasAddedPodsLock.Lock()
dswp.hasAddedPods = true
dswp.hasAddedPodsLock.Unlock()
wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)
}
(5.3.1.1)populatorLoopFunc函数
- 路径pkg/kubelet/volumemanager/polulator/desired_state_of_world_polulator.go
- 主要是findAndAddNewPods函数
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndAddNewPods()
// findAndRemoveDeletedPods() calls out to the container runtime to
// determine if the containers for a given pod are terminated. This is
// an expensive operation, therefore we limit the rate that
// findAndRemoveDeletedPods() is called independently of the main
// populator loop.
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
glog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return
}
dswp.findAndRemoveDeletedPods()
}
}
(5.4)findAndAddNewPods函数
- findAndAddNewPods函数遍历所有pods,对于状态为terminated状态的不做处理,
- 主要调用函数为processPodVolumes
// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
for _, pod := range dswp.podManager.GetPods() {
if dswp.isPodTerminated(pod) {
// Do not (re)add volumes for terminated pods
continue
}
dswp.processPodVolumes(pod)
}
}
(5.5)processPodVolumes函数
- find
// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
uniquePodName := volumehelper.GetUniquePodName(pod)
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}
allVolumesAdded := true
mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers)
// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mountsMap, devicesMap)
// Add volume to desired state of world
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
// some of the volume additions may have failed, should not mark this pod as fully processed
if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
}
}