Add option to limit the number of concurrent mkfs calls

This commit is contained in:
Artem Minyaylov 2023-02-04 11:05:33 -08:00
parent f573e14942
commit d2dd415f00
4 changed files with 196 additions and 16 deletions

View File

@ -147,6 +147,37 @@ func NewMountError(mountErrorValue MountErrorType, format string, args ...interf
type SafeFormatAndMount struct {
Interface
Exec utilexec.Interface
formatSem chan any
formatTimeout time.Duration
}
func NewSafeFormatAndMount(mounter Interface, exec utilexec.Interface, opts ...Option) *SafeFormatAndMount {
res := &SafeFormatAndMount{
Interface: mounter,
Exec: exec,
}
for _, opt := range opts {
opt(res)
}
return res
}
type Option func(*SafeFormatAndMount)
// WithMaxConcurrentFormat sets the maximum number of concurrent format
// operations executed by the mounter. The timeout controls the maximum
// duration of a format operation before its concurrency token is released.
// Once a token is released, it can be acquired by another concurrent format
// operation. The original operation is allowed to complete.
// If n < 1, concurrency is set to unlimited.
func WithMaxConcurrentFormat(n int, timeout time.Duration) Option {
return func(mounter *SafeFormatAndMount) {
if n > 0 {
mounter.formatSem = make(chan any, n)
mounter.formatTimeout = timeout
}
}
}
// FormatAndMount formats the given disk, if needed, and mounts it.

View File

@ -515,7 +515,8 @@ func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target
args = append(formatOptions, args...)
klog.Infof("Disk %q appears to be unformatted, attempting to format as type: %q with options: %v", source, fstype, args)
output, err := mounter.Exec.Command("mkfs."+fstype, args...).CombinedOutput()
output, err := mounter.format(fstype, args)
if err != nil {
// Do not log sensitiveOptions only options
sensitiveOptionsLog := sanitizedOptionsForLogging(options, sensitiveOptions)
@ -550,6 +551,29 @@ func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target
return nil
}
func (mounter *SafeFormatAndMount) format(fstype string, args []string) ([]byte, error) {
if mounter.formatSem != nil {
done := make(chan struct{})
defer close(done)
mounter.formatSem <- struct{}{}
go func() {
defer func() { <-mounter.formatSem }()
timeout := time.NewTimer(mounter.formatTimeout)
defer timeout.Stop()
select {
case <-done:
case <-timeout.C:
}
}()
}
return mounter.Exec.Command("mkfs."+fstype, args...).CombinedOutput()
}
func getDiskFormat(exec utilexec.Interface, disk string) (string, error) {
args := []string{"-p", "-s", "TYPE", "-s", "PTTYPE", "-o", "export", disk}
klog.V(4).Infof("Attempting to determine if disk %q is formatted using blkid with args: (%v)", disk, args)

View File

@ -21,10 +21,12 @@ package mount
import (
"errors"
"fmt"
"io/ioutil"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
@ -560,44 +562,40 @@ func mountArgsContainOption(t *testing.T, mountArgs []string, option string) boo
}
func TestDetectSafeNotMountedBehavior(t *testing.T) {
// example output for umount from util-linux 2.30.2
// Example output for umount from util-linux 2.30.2
notMountedOutput := "umount: /foo: not mounted."
// Arrange
testcases := []struct {
fakeCommandAction testexec.FakeCommandAction
expectedSafe bool
}{
{
fakeCommandAction: makeFakeCommandAction(notMountedOutput, errors.New("any error")),
fakeCommandAction: makeFakeCommandAction(notMountedOutput, errors.New("any error"), nil),
expectedSafe: true,
},
{
fakeCommandAction: makeFakeCommandAction(notMountedOutput, nil),
fakeCommandAction: makeFakeCommandAction(notMountedOutput, nil, nil),
expectedSafe: false,
},
{
fakeCommandAction: makeFakeCommandAction("any output", nil),
fakeCommandAction: makeFakeCommandAction("any output", nil, nil),
expectedSafe: false,
},
{
fakeCommandAction: makeFakeCommandAction("any output", errors.New("any error")),
fakeCommandAction: makeFakeCommandAction("any output", errors.New("any error"), nil),
expectedSafe: false,
},
}
for _, v := range testcases {
// Prepare
fakeexec := &testexec.FakeExec{
LookPathFunc: func(s string) (string, error) {
return "fake-umount", nil
},
CommandScript: []testexec.FakeCommandAction{v.fakeCommandAction},
}
// Act
isSafe := detectSafeNotMountedBehaviorWithExec(fakeexec)
// Assert
if isSafe != v.expectedSafe {
if detectSafeNotMountedBehaviorWithExec(fakeexec) != v.expectedSafe {
var adj string
if v.expectedSafe {
adj = "safe"
@ -609,10 +607,96 @@ func TestDetectSafeNotMountedBehavior(t *testing.T) {
}
}
func makeFakeCommandAction(stdout string, err error) testexec.FakeCommandAction {
func TestFormat(t *testing.T) {
const (
formatCount = 5
fstype = "ext4"
output = "complete"
cmdDuration = 1 * time.Millisecond
defaultTimeout = 1 * time.Minute
)
tests := []struct {
desc string
max int
timeout time.Duration
wantConcurrent int
}{
{
max: 0,
wantConcurrent: formatCount,
},
{
max: -1,
wantConcurrent: formatCount,
},
{
max: 1,
wantConcurrent: 1,
},
{
max: 3,
wantConcurrent: 3,
},
{
max: 3,
timeout: 1 * time.Nanosecond,
wantConcurrent: formatCount,
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("max=%d,timeout=%s", tc.max, tc.timeout.String()), func(t *testing.T) {
if tc.timeout == 0 {
tc.timeout = defaultTimeout
}
var concurrent, maxConcurrent int
var mu sync.Mutex
exec := &testexec.FakeExec{}
for i := 0; i < formatCount; i++ {
exec.CommandScript = append(exec.CommandScript, makeFakeCommandAction(output, nil, func() {
mu.Lock()
concurrent++
if concurrent > maxConcurrent {
maxConcurrent = concurrent
}
mu.Unlock()
time.Sleep(cmdDuration)
mu.Lock()
concurrent--
mu.Unlock()
}))
}
mounter := NewSafeFormatAndMount(nil, exec, WithMaxConcurrentFormat(tc.max, tc.timeout))
var wg sync.WaitGroup
for i := 0; i < formatCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mounter.format(fstype, nil)
}()
}
wg.Wait()
if maxConcurrent != tc.wantConcurrent {
t.Errorf("SafeFormatAndMount.format() got concurrency: %d, want: %d", maxConcurrent, tc.wantConcurrent)
}
})
}
}
func makeFakeCommandAction(stdout string, err error, cmdFn func()) testexec.FakeCommandAction {
c := testexec.FakeCmd{
CombinedOutputScript: []testexec.FakeAction{
func() ([]byte, []byte, error) {
if cmdFn != nil {
cmdFn()
}
return []byte(stdout), nil, err
},
},

View File

@ -17,9 +17,11 @@ limitations under the License.
package mount
import (
"fmt"
"reflect"
"strings"
"testing"
"time"
)
func TestMakeBindOpts(t *testing.T) {
@ -142,7 +144,6 @@ func TestMakeBindOptsSensitive(t *testing.T) {
}
func TestOptionsForLogging(t *testing.T) {
// Arrange
testcases := []struct {
options []string
sensitiveOptions []string
@ -165,10 +166,8 @@ func TestOptionsForLogging(t *testing.T) {
}
for _, v := range testcases {
// Act
maskedStr := sanitizedOptionsForLogging(v.options, v.sensitiveOptions)
// Assert
for _, sensitiveOption := range v.sensitiveOptions {
if strings.Contains(maskedStr, sensitiveOption) {
t.Errorf("Found sensitive log option %q in %q", sensitiveOption, maskedStr)
@ -182,3 +181,45 @@ func TestOptionsForLogging(t *testing.T) {
}
}
}
func TestWithMaxConcurrentFormat(t *testing.T) {
const timeout = 1 * time.Minute
tests := []struct {
max int
wantSem bool
}{
{
max: 0,
},
{
max: -1,
},
{
max: 1,
wantSem: true,
},
{
max: 3,
wantSem: true,
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("max=%d,timeout=%s", tc.max, timeout.String()), func(t *testing.T) {
mounter := NewSafeFormatAndMount(nil, nil, WithMaxConcurrentFormat(tc.max, timeout))
if gotSem := mounter.formatSem != nil; gotSem != tc.wantSem {
t.Errorf("NewSafeFormatAndMount() got formatSem: %t, want: %t", gotSem, tc.wantSem)
}
if tc.wantSem {
if gotCap := cap(mounter.formatSem); gotCap != tc.max {
t.Errorf("NewSafeFormatAndMount() got cap(formatSem): %d, want: %d", gotCap, tc.max)
}
if mounter.formatTimeout != timeout {
t.Errorf("NewSafeFormatAndMount() got formatTimeout: %s, want: %s", mounter.formatTimeout.String(), timeout.String())
}
}
})
}
}