Skip to content

Commit

Permalink
disk: re-implement StagingTargetPath cleanup logic
Browse files Browse the repository at this point in the history
We should unlink the file we created for volumeDevice.

We should check targetPath is not mounted before we check for volumeDevice, in case the user has a file with the same name.

And the overall complexity is greatly reduced. We do not call stat() on the mounted path now, so no need to worry about EIO or similar.
  • Loading branch information
huww98 committed Dec 4, 2024
1 parent 9819c8b commit 3d8616d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 72 deletions.
118 changes: 60 additions & 58 deletions pkg/disk/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ const (
RundSocketDir = "/host/etc/kubernetes/volumes/rund/"
// VolumeDirRemove volume dir remove
VolumeDirRemove = "/host/etc/kubernetes/volumes/disk/remove"
// InputOutputErr tag
InputOutputErr = "input/output error"
// DiskMultiTenantEnable Enable disk multi-tenant mode
DiskMultiTenantEnable = "DISK_MULTI_TENANT_ENABLE"
// TenantUserUID tag
Expand Down Expand Up @@ -688,9 +686,44 @@ func addDiskXattr(diskID string) (err error) {
return unix.Setxattr(device, DiskXattrName, []byte(diskID), 0)
}

// target format: /var/lib/kubelet/plugins/kubernetes.io/csi/pv/pv-disk-1e7001e0-c54a-11e9-8f89-00163e0e78a0/globalmount
func ensureUnmounted(mounter k8smount.Interface, target string) error {
notmounted, err := mounter.IsLikelyNotMountPoint(target)
if err != nil {
return fmt.Errorf("failed to check if %s is not a mount point after unmount: %w", target, err)
}
if !notmounted {
return fmt.Errorf("path %s is still mounted after unmount", target)
}
return nil
}

func cleanupVolumeDeviceMount(path string) error {
err := unix.Unmount(path, 0)
if err != nil {
switch {
case errors.Is(err, unix.ENOENT):
return nil
case errors.Is(err, unix.EINVAL):
// Maybe not mounted, proceed to remove it. If not, unlink will report error.
default:
return err
}
}

errUnlink := unix.Unlink(path)
if errUnlink == nil {
return nil
}
if err != nil {
return fmt.Errorf("failed to unlink %s: %w", path, errUnlink)
} else {
return fmt.Errorf("failed to unmount %s: %w; then failed to unlink: %w", path, err, errUnlink)
}
}

func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
klog.Infof("NodeUnstageVolume:: Starting to Unmount volume, volumeId: %s, target: %v", req.VolumeId, req.StagingTargetPath)
logger := klog.FromContext(ctx)
logger.Info("Starting to Unmount volume", "target", req.StagingTargetPath)

if !ns.locks.TryAcquire(req.VolumeId) {
return nil, status.Errorf(codes.Aborted, "There is already an operation for %s", req.VolumeId)
Expand All @@ -699,66 +732,35 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag

// check block device mountpoint
targetPath := req.GetStagingTargetPath()
tmpPath := filepath.Join(req.GetStagingTargetPath(), req.VolumeId)
if IsFileExisting(tmpPath) {
fileInfo, err := os.Lstat(tmpPath)
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), InputOutputErr) {
if err = isPathAvailiable(targetPath); err != nil {
if err = ns.k8smounter.Unmount(targetPath); err != nil {
klog.Errorf("NodeUnstageVolume: umount target %s(input/output error) with error: %v", targetPath, err)
return nil, status.Errorf(codes.InvalidArgument, "NodeUnstageVolume umount target %s with errror: %v", targetPath, err)
}
klog.Warningf("NodeUnstageVolume: target path %s show input/output error: %v, umount it.", targetPath, err)
}
} else {
klog.Errorf("NodeUnstageVolume: lstat mountpoint: %s with error: %s", tmpPath, err.Error())
return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume: stat mountpoint error: "+err.Error())
}
} else {
if (fileInfo.Mode() & os.ModeDevice) != 0 {
klog.Infof("NodeUnstageVolume: mountpoint %s, is block device", tmpPath)
err := unix.Unmount(targetPath, 0)
if err != nil {
switch {
case errors.Is(err, unix.ENOENT):
logger.Info("targetPath not exist, continue to detach")
case errors.Is(err, unix.EINVAL):
// Maybe unmounted, lets check
if errCheck := ensureUnmounted(ns.k8smounter, targetPath); errCheck != nil {
return nil, status.Errorf(codes.Internal, "failed to unmount %s: %v. %v", targetPath, err, errCheck)
}
// if mountpoint not a block device, maybe something wrong happened in VolumeStageVolume.
// when pod deleted, the volume should be detached
targetPath = tmpPath
}
}

// Step 1: check folder exists and umount
msgLog := ""
if IsFileExisting(targetPath) {
notmounted, err := ns.k8smounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
klog.Errorf("NodeUnstageVolume: VolumeId: %s, check mountPoint: %s error: %v", req.VolumeId, targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
if !notmounted {
err = ns.k8smounter.Unmount(targetPath)
// really umounted, check volumeDevice
// Note: we remove the blockPath, but not targetPath, because the former is created by us, while the latter is created by CO.
blockPath := filepath.Join(targetPath, req.VolumeId)
logger.Info("targetPath may not be a mountpoint, checking volumeDevice")
err := cleanupVolumeDeviceMount(blockPath)
if err != nil {
klog.Errorf("NodeUnstageVolume: VolumeId: %s, umount path: %s failed with: %v", req.VolumeId, targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
notmounted, err = ns.k8smounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check if %s is not a mount point after umount: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "failed to cleanup volumeDevice path %s: %v", blockPath, err)
}
if !notmounted {
klog.Errorf("NodeUnstageVolume: TargetPath mounted yet, volumeId: %s, Target: %s", req.VolumeId, targetPath)
return nil, status.Error(codes.Internal, "NodeUnstageVolume: TargetPath mounted yet with target"+targetPath)
}
} else {
msgLog = fmt.Sprintf("NodeUnstageVolume: VolumeId: %s, mountpoint: %s not mounted, skipping and continue to detach", req.VolumeId, targetPath)
default:
return nil, status.Errorf(codes.Internal, "failed to unmount %s: %v", targetPath, err)
}
} else {
msgLog = fmt.Sprintf("NodeUnstageVolume: VolumeId: %s, Path %s doesn't exist, continue to detach", req.VolumeId, targetPath)
}

if msgLog == "" {
klog.Infof("NodeUnstageVolume: Unmount TargetPath successful, target %v, volumeId: %s", targetPath, req.VolumeId)
} else {
klog.Infof(msgLog)
err := ensureUnmounted(ns.k8smounter, targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
logger.V(2).Info("targetPath cleaned up")

if IsVFNode() {
if err := unbindBdfDisk(req.VolumeId); err != nil {
Expand All @@ -777,7 +779,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}
}

err := addDiskXattr(req.VolumeId)
err = addDiskXattr(req.VolumeId)
if err != nil {
klog.Errorf("NodeUnstageVolume: addDiskXattr %s failed: %v", req.VolumeId, err)
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/disk/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,20 +872,6 @@ func GetVolumeDeviceName(diskID string) (string, error) {
return device, err
}

// isPathAvailiable
func isPathAvailiable(path string) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("Open Path (%s) with error: %v ", path, err)
}
defer f.Close()
_, err = f.Readdirnames(1)
if err != nil && err != io.EOF {
return fmt.Errorf("Read Path (%s) with error: %v ", path, err)
}
return nil
}

func getBlockDeviceCapacity(devicePath string) int64 {

file, err := os.Open(devicePath)
Expand Down

0 comments on commit 3d8616d

Please sign in to comment.