git version: https://github.com/kubernetes-csi/csi-driver-host-path v1.1.0
Changelog
- Added raw block support: #6, #42
- Added ephemeral volume support: #10
- Improved deployment examples: #14, #27, #29
- Added snapshot examples: #19, #31
- Fix csi-sanity check: #28
- Persist data across restarts: #20
启动命令
/hostpathplugin --v=8 --endpoint=unix:///csi/csi.sock --nodeid=master-node
分为三阶段就是 Provision Attach Mount
CreateVolume +------------+ DeleteVolume
+------------->| CREATED +--------------+
| +---+----+---+ |
| Controller | | Controller v
+++ Publish | | Unpublish +++
|X| Volume | | Volume | |
+-+ +---v----+---+ +-+
| NODE_READY |
+---+----^---+
Node | | Node
Stage | | Unstage
Volume | | Volume
+---v----+---+
| VOL_READY |
+------------+
Node | | Node
Publish | | Unpublish
Volume | | Volume
+---v----+---+
| PUBLISHED |
+------------+The lifecycle of a dynamically provisioned volume, from
creation to destruction, when the Node Plugin advertises the
STAGE_UNSTAGE_VOLUME capability.
main
--> handle
--> NewHostPathDriver
--> driver.Run
1. handle
主要执行体,NewHostPathDriver 实例化 hostPath,定义如下:
type hostPath struct { name string nodeID string version string endpoint string ephemeral bool ids *identityServer ns *nodeServer cs *controllerServer }
func handle() {
driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, version, *ephemeral)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
}
driver.Run()
}
1.1 Run 函数
实例化 identityServer,实现了 GetPluginInfo Probe GetPluginCapabilities 方法
实例化 nodeServer,实现了 NodePublishVolume NodeUnpublishVolume NodeStageVolume NodeUnstageVolume NodeGetInfo NodeGetCapabilities NodeGetVolumeStats 方法
实例化 controllerServer,实现了 CreateVolume DeleteVolume ControllerPublishVolume ControllerUnpublishVolume ListVolumes CreateSnapshot DeleteSnapshot 方法
创建 GRPC server,并启动 GRPC 服务,监听 socket 根据 --endpoint=unix:///csi/csi.sock 设置
func (hp *hostPath) Run() { // Create GRPC servers hp.ids = NewIdentityServer(hp.name, hp.version) hp.ns = NewNodeServer(hp.nodeID, hp.ephemeral) hp.cs = NewControllerServer(hp.ephemeral) s := NewNonBlockingGRPCServer() s.Start(hp.endpoint, hp.ids, hp.cs, hp.ns) s.Wait() }
2. controllerServer
实现了 CreateVolume DeleteVolume ControllerPublishVolume ControllerUnpublishVolume ListVolumes CreateSnapshot DeleteSnapshot 方法
2.1 NewControllerServer
实例化 controllerServer,拥有能力 CREATE_DELETE_VOLUME CREATE_DELETE_SNAPSHOT LIST_SNAPSHOTS CLONE_VOLUME
func NewControllerServer(ephemeral bool) *controllerServer {
if ephemeral {
return &controllerServer{caps: getControllerServiceCapabilities(nil)}
}
return &controllerServer{
caps: getControllerServiceCapabilities(
[]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
}),
}
}
2.2 CreateVolume
这个接受从 external-provisioner 发送的 GRPC 请求 CreateVolumeRequest
先看是否有能力 CREATE_DELETE_VOLUME,参数 name 必须设置的,
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("invalid create volume req: %v", req)
return nil, err
}
// Check arguments
if len(req.GetName()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
}
caps := req.GetVolumeCapabilities()
if caps == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
}
2.2.1 检查全局 hostPathVolumes 是否存在相同名情况
// Need to check for already existing volume name, and if found
// check for the requested capacity and already allocated capacity
if exVol, err := getVolumeByName(req.GetName()); err == nil {
// Since err is nil, it means the volume with the same name already exists
// need to check if the size of exisiting volume is the same as in new
// request
if exVol.VolSize >= int64(req.GetCapacityRange().GetRequiredBytes()) {
// exisiting volume is compatible with new request and should be reused.
// TODO (sbezverk) Do I need to make sure that RBD volume still exists?
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: exVol.VolID,
CapacityBytes: int64(exVol.VolSize),
VolumeContext: req.GetParameters(),
},
}, nil
}
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with different size already exist", req.GetName()))
}
2.2.2 请求 access 类型为 block 情况
fallocate 文件内部游标所在位置设为文件末尾,可以用来截取或者扩展文件,这个函数本质上相当于直接操作文件分区表的结构,用它来作文件扩展是不需要频繁的填充文件,稀疏文件,创建大文件就比较快,如下命令类似
# fallocate -l 10G bigfile
# truncate -s 10G bigfile
# dd of=bigfile bs=1 seek=10G count=0
调用命令 losetup,具体看如下 ‘知识 loop设备及losetup’ 介绍
if requestedAccessType == blockAccess {
executor := utilexec.New()
size := fmt.Sprintf("%dM", capacity/mib)
// Create a block file.
out, err := executor.Command("fallocate", "-l", size, path).CombinedOutput()
if err != nil {
glog.V(3).Infof("failed to create block device: %v", string(out))
return nil, err
}
// Associate block file with the loop device.
volPathHandler := volumepathhandler.VolumePathHandler{}
_, err = volPathHandler.AttachFileDevice(path)
if err != nil {
glog.Errorf("failed to attach device: %v", err)
// Remove the block file because it'll no longer be used again.
if err2 := os.Remove(path); err != nil {
glog.Errorf("failed to cleanup block file %s: %v", path, err2)
}
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to attach device: %v", err))
}
}
2.2.3 创建一个目录
// createVolume create the directory for the hostpath volume.
// It returns the volume path or err if one occurs.
func createHostpathVolume(volID, name string, cap int64, volAccessType accessType) (*hostPathVolume, error) {
path := getVolumePath(volID)
if volAccessType == mountAccess {
err := os.MkdirAll(path, 0777)
if err != nil {
return nil, err
}
}
hostpathVol := hostPathVolume{
VolID: volID,
VolName: name,
VolSize: cap,
VolPath: path,
VolAccessType: volAccessType,
}
hostPathVolumes[volID] = hostpathVol
return &hostpathVol, nil
}
2.2.4 从 snapshot 创建 volume 情况
if req.GetVolumeContentSource() != nil {
contentSource := req.GetVolumeContentSource()
if contentSource.GetSnapshot() != nil {
snapshotId := contentSource.GetSnapshot().GetSnapshotId()
snapshot, ok := hostPathVolumeSnapshots[snapshotId]
if !ok {
deleteHostpathVolume(volumeID)
return nil, status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId)
}
if snapshot.ReadyToUse != true {
deleteHostpathVolume(volumeID)
return nil, status.Errorf(codes.Internal, "Snapshot %v is not yet ready to use.", snapshotId)
}
snapshotPath := snapshot.Path
args := []string{"zxvf", snapshotPath, "-C", path}
executor := utilexec.New()
out, err := executor.Command("tar", args...).CombinedOutput()
if err != nil {
deleteHostpathVolume(volumeID)
return nil, status.Error(codes.Internal, fmt.Sprintf("failed pre-populate data for volume: %v: %s", err, out))
}
}
if srcVolume := contentSource.GetVolume(); srcVolume != nil {
srcVolumeID := srcVolume.GetVolumeId()
hostPathVolume, ok := hostPathVolumes[srcVolumeID]
if !ok {
deleteHostpathVolume(volumeID)
return nil, status.Error(codes.NotFound, "source volumeID does not exist, are source/destination in the same storage class?")
}
srcPath := hostPathVolume.VolPath
isEmpty, err := hostPathIsEmpty(srcPath)
if err != nil {
deleteHostpathVolume(volumeID)
return nil, status.Error(codes.Internal, fmt.Sprintf("failed verification check of source hostpath volume: %s: %v", srcVolumeID, err))
}
// If the source hostpath volume is empty it's a noop and we just move along, otherwise the cp call will fail with a a file stat error DNE
if !isEmpty {
args := []string{"-a", srcPath + "/*", path + "/"}
executor := utilexec.New()
out, err := executor.Command("cp", args...).CombinedOutput()
if err != nil {
deleteHostpathVolume(volumeID)
return nil, status.Error(codes.Internal, fmt.Sprintf("failed pre-populate data (clone) for volume: %s: %s", volumeID, out))
}
}
}
}
2.3 CreateSnapshot 函数
检验是否有能力 CREATE_DELETE_SNAPSHOT,请求 name 必须,volume id 必须
// CreateSnapshot uses tar command to create snapshot for hostpath volume. The tar command can quickly create
// archives of entire directories. The host image must have "tar" binaries in /bin, /usr/sbin, or /usr/bin.
func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if err := cs.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
glog.V(3).Infof("invalid create snapshot req: %v", req)
return nil, err
}
if len(req.GetName()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
}
// Check arguments
if len(req.GetSourceVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "SourceVolumeId missing in request")
}
2.3.1 是否已经存在相同的 snapshot 情况
// Need to check for already existing snapshot name, and if found check for the
// requested sourceVolumeId and sourceVolumeId of snapshot that has been created.
if exSnap, err := getSnapshotByName(req.GetName()); err == nil {
// Since err is nil, it means the snapshot with the same name already exists need
// to check if the sourceVolumeId of existing snapshot is the same as in new request.
if exSnap.VolID == req.GetSourceVolumeId() {
// same snapshot has been created.
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SnapshotId: exSnap.Id,
SourceVolumeId: exSnap.VolID,
CreationTime: &exSnap.CreationTime,
SizeBytes: exSnap.SizeBytes,
ReadyToUse: exSnap.ReadyToUse,
},
}, nil
}
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("snapshot with the same name: %s but with different SourceVolumeId already exist", req.GetName()))
}
2.3.2 主要创建 snapshot 操作,其实就是 tar 打包命令
snapshotID := uuid.NewUUID().String()
creationTime := ptypes.TimestampNow()
volPath := hostPathVolume.VolPath
filePath := []string{snapshotRoot, "/", snapshotID, ".tgz"}
file := strings.Join(filePath, "")
args := []string{}
if hostPathVolume.VolAccessType == blockAccess {
glog.V(4).Infof("Creating snapshot of Raw Block Mode Volume")
args = []string{"czf", file, volPath}
} else {
glog.V(4).Infof("Creating snapshot of Filsystem Mode Volume")
args = []string{"czf", file, "-C", volPath, "."}
}
executor := utilexec.New()
out, err := executor.Command("tar", args...).CombinedOutput()
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed create snapshot: %v: %s", err, out))
}
知识 loop设备及losetup
loop 设备是一种伪设备(pseudo-device),或者也可以说是仿真设备。它能使我们像块设备一样访问一个文件。
在使用之前,一个 loop 设备必须要和一个文件进行连接。
losetup命令
设置循环设备。循环设备可把文件虚拟成块设备,籍此来模拟整个文件系统,让用户得以将其视为硬盘驱动器,光驱或软驱等设备,并挂入当作目录来使用。
losetup [ -e encryption ] [ -o offset ] loop_device file
losetup [ -d ] loop_device
(1)创建空的磁盘镜像文件
$ dd if=/dev/zero of=floppy.img bs=512 count=1000
(2)使用 losetup将磁盘镜像文件虚拟成快设备
$ losetup /dev/loop1 floppy.img
(3)挂载块设备
$ mount /dev/loop0 /tmp