diff --git a/staging/src/k8s.io/mount-utils/mount.go b/staging/src/k8s.io/mount-utils/mount.go index d7845ac495c..0bd91ee617b 100644 --- a/staging/src/k8s.io/mount-utils/mount.go +++ b/staging/src/k8s.io/mount-utils/mount.go @@ -147,6 +147,37 @@ func NewMountError(mountErrorValue MountErrorType, format string, args ...interf type SafeFormatAndMount struct { Interface Exec utilexec.Interface + + formatSem chan any + formatTimeout time.Duration +} + +func NewSafeFormatAndMount(mounter Interface, exec utilexec.Interface, opts ...Option) *SafeFormatAndMount { + res := &SafeFormatAndMount{ + Interface: mounter, + Exec: exec, + } + for _, opt := range opts { + opt(res) + } + return res +} + +type Option func(*SafeFormatAndMount) + +// WithMaxConcurrentFormat sets the maximum number of concurrent format +// operations executed by the mounter. The timeout controls the maximum +// duration of a format operation before its concurrency token is released. +// Once a token is released, it can be acquired by another concurrent format +// operation. The original operation is allowed to complete. +// If n < 1, concurrency is set to unlimited. +func WithMaxConcurrentFormat(n int, timeout time.Duration) Option { + return func(mounter *SafeFormatAndMount) { + if n > 0 { + mounter.formatSem = make(chan any, n) + mounter.formatTimeout = timeout + } + } } // FormatAndMount formats the given disk, if needed, and mounts it. diff --git a/staging/src/k8s.io/mount-utils/mount_linux.go b/staging/src/k8s.io/mount-utils/mount_linux.go index 50e9382828f..f0125fcb489 100644 --- a/staging/src/k8s.io/mount-utils/mount_linux.go +++ b/staging/src/k8s.io/mount-utils/mount_linux.go @@ -515,7 +515,8 @@ func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target args = append(formatOptions, args...) klog.Infof("Disk %q appears to be unformatted, attempting to format as type: %q with options: %v", source, fstype, args) - output, err := mounter.Exec.Command("mkfs."+fstype, args...).CombinedOutput() + + output, err := mounter.format(fstype, args) if err != nil { // Do not log sensitiveOptions only options sensitiveOptionsLog := sanitizedOptionsForLogging(options, sensitiveOptions) @@ -550,6 +551,29 @@ func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target return nil } +func (mounter *SafeFormatAndMount) format(fstype string, args []string) ([]byte, error) { + if mounter.formatSem != nil { + done := make(chan struct{}) + defer close(done) + + mounter.formatSem <- struct{}{} + + go func() { + defer func() { <-mounter.formatSem }() + + timeout := time.NewTimer(mounter.formatTimeout) + defer timeout.Stop() + + select { + case <-done: + case <-timeout.C: + } + }() + } + + return mounter.Exec.Command("mkfs."+fstype, args...).CombinedOutput() +} + func getDiskFormat(exec utilexec.Interface, disk string) (string, error) { args := []string{"-p", "-s", "TYPE", "-s", "PTTYPE", "-o", "export", disk} klog.V(4).Infof("Attempting to determine if disk %q is formatted using blkid with args: (%v)", disk, args) diff --git a/staging/src/k8s.io/mount-utils/mount_linux_test.go b/staging/src/k8s.io/mount-utils/mount_linux_test.go index 26291030a45..dec91c72050 100644 --- a/staging/src/k8s.io/mount-utils/mount_linux_test.go +++ b/staging/src/k8s.io/mount-utils/mount_linux_test.go @@ -21,10 +21,12 @@ package mount import ( "errors" + "fmt" "io/ioutil" "os" "reflect" "strings" + "sync" "testing" "time" @@ -560,44 +562,40 @@ func mountArgsContainOption(t *testing.T, mountArgs []string, option string) boo } func TestDetectSafeNotMountedBehavior(t *testing.T) { - // example output for umount from util-linux 2.30.2 + // Example output for umount from util-linux 2.30.2 notMountedOutput := "umount: /foo: not mounted." - // Arrange testcases := []struct { fakeCommandAction testexec.FakeCommandAction expectedSafe bool }{ { - fakeCommandAction: makeFakeCommandAction(notMountedOutput, errors.New("any error")), + fakeCommandAction: makeFakeCommandAction(notMountedOutput, errors.New("any error"), nil), expectedSafe: true, }, { - fakeCommandAction: makeFakeCommandAction(notMountedOutput, nil), + fakeCommandAction: makeFakeCommandAction(notMountedOutput, nil, nil), expectedSafe: false, }, { - fakeCommandAction: makeFakeCommandAction("any output", nil), + fakeCommandAction: makeFakeCommandAction("any output", nil, nil), expectedSafe: false, }, { - fakeCommandAction: makeFakeCommandAction("any output", errors.New("any error")), + fakeCommandAction: makeFakeCommandAction("any output", errors.New("any error"), nil), expectedSafe: false, }, } for _, v := range testcases { - // Prepare fakeexec := &testexec.FakeExec{ LookPathFunc: func(s string) (string, error) { return "fake-umount", nil }, CommandScript: []testexec.FakeCommandAction{v.fakeCommandAction}, } - // Act - isSafe := detectSafeNotMountedBehaviorWithExec(fakeexec) - // Assert - if isSafe != v.expectedSafe { + + if detectSafeNotMountedBehaviorWithExec(fakeexec) != v.expectedSafe { var adj string if v.expectedSafe { adj = "safe" @@ -609,10 +607,96 @@ func TestDetectSafeNotMountedBehavior(t *testing.T) { } } -func makeFakeCommandAction(stdout string, err error) testexec.FakeCommandAction { +func TestFormat(t *testing.T) { + const ( + formatCount = 5 + fstype = "ext4" + output = "complete" + cmdDuration = 1 * time.Millisecond + defaultTimeout = 1 * time.Minute + ) + + tests := []struct { + desc string + max int + timeout time.Duration + wantConcurrent int + }{ + { + max: 0, + wantConcurrent: formatCount, + }, + { + max: -1, + wantConcurrent: formatCount, + }, + { + max: 1, + wantConcurrent: 1, + }, + { + max: 3, + wantConcurrent: 3, + }, + { + max: 3, + timeout: 1 * time.Nanosecond, + wantConcurrent: formatCount, + }, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("max=%d,timeout=%s", tc.max, tc.timeout.String()), func(t *testing.T) { + if tc.timeout == 0 { + tc.timeout = defaultTimeout + } + + var concurrent, maxConcurrent int + var mu sync.Mutex + + exec := &testexec.FakeExec{} + for i := 0; i < formatCount; i++ { + exec.CommandScript = append(exec.CommandScript, makeFakeCommandAction(output, nil, func() { + mu.Lock() + concurrent++ + if concurrent > maxConcurrent { + maxConcurrent = concurrent + } + mu.Unlock() + + time.Sleep(cmdDuration) + + mu.Lock() + concurrent-- + mu.Unlock() + })) + } + mounter := NewSafeFormatAndMount(nil, exec, WithMaxConcurrentFormat(tc.max, tc.timeout)) + + var wg sync.WaitGroup + for i := 0; i < formatCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + mounter.format(fstype, nil) + }() + } + wg.Wait() + + if maxConcurrent != tc.wantConcurrent { + t.Errorf("SafeFormatAndMount.format() got concurrency: %d, want: %d", maxConcurrent, tc.wantConcurrent) + } + }) + } +} + +func makeFakeCommandAction(stdout string, err error, cmdFn func()) testexec.FakeCommandAction { c := testexec.FakeCmd{ CombinedOutputScript: []testexec.FakeAction{ func() ([]byte, []byte, error) { + if cmdFn != nil { + cmdFn() + } return []byte(stdout), nil, err }, }, diff --git a/staging/src/k8s.io/mount-utils/mount_test.go b/staging/src/k8s.io/mount-utils/mount_test.go index 041a28dae00..4fa6484d59e 100644 --- a/staging/src/k8s.io/mount-utils/mount_test.go +++ b/staging/src/k8s.io/mount-utils/mount_test.go @@ -17,9 +17,11 @@ limitations under the License. package mount import ( + "fmt" "reflect" "strings" "testing" + "time" ) func TestMakeBindOpts(t *testing.T) { @@ -142,7 +144,6 @@ func TestMakeBindOptsSensitive(t *testing.T) { } func TestOptionsForLogging(t *testing.T) { - // Arrange testcases := []struct { options []string sensitiveOptions []string @@ -165,10 +166,8 @@ func TestOptionsForLogging(t *testing.T) { } for _, v := range testcases { - // Act maskedStr := sanitizedOptionsForLogging(v.options, v.sensitiveOptions) - // Assert for _, sensitiveOption := range v.sensitiveOptions { if strings.Contains(maskedStr, sensitiveOption) { t.Errorf("Found sensitive log option %q in %q", sensitiveOption, maskedStr) @@ -182,3 +181,45 @@ func TestOptionsForLogging(t *testing.T) { } } } + +func TestWithMaxConcurrentFormat(t *testing.T) { + const timeout = 1 * time.Minute + + tests := []struct { + max int + wantSem bool + }{ + { + max: 0, + }, + { + max: -1, + }, + { + max: 1, + wantSem: true, + }, + { + max: 3, + wantSem: true, + }, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("max=%d,timeout=%s", tc.max, timeout.String()), func(t *testing.T) { + mounter := NewSafeFormatAndMount(nil, nil, WithMaxConcurrentFormat(tc.max, timeout)) + + if gotSem := mounter.formatSem != nil; gotSem != tc.wantSem { + t.Errorf("NewSafeFormatAndMount() got formatSem: %t, want: %t", gotSem, tc.wantSem) + } + if tc.wantSem { + if gotCap := cap(mounter.formatSem); gotCap != tc.max { + t.Errorf("NewSafeFormatAndMount() got cap(formatSem): %d, want: %d", gotCap, tc.max) + } + if mounter.formatTimeout != timeout { + t.Errorf("NewSafeFormatAndMount() got formatTimeout: %s, want: %s", mounter.formatTimeout.String(), timeout.String()) + } + } + }) + } +}