From f4bd4e2e965fe6ef093ccea0cc8ef4cc40ce0c5f Mon Sep 17 00:00:00 2001 From: whypro Date: Fri, 1 Nov 2019 16:17:04 +0800 Subject: [PATCH] Return error instead of panic when cpu manager starts failed. --- pkg/kubelet/cm/container_manager_linux.go | 5 +- pkg/kubelet/cm/cpumanager/cpu_manager.go | 23 +++-- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 9 +- pkg/kubelet/cm/cpumanager/fake_cpu_manager.go | 3 +- pkg/kubelet/cm/cpumanager/policy.go | 2 +- pkg/kubelet/cm/cpumanager/policy_none.go | 3 +- pkg/kubelet/cm/cpumanager/policy_static.go | 14 +-- .../cm/cpumanager/policy_static_test.go | 81 +++++++++-------- .../cm/cpumanager/state/state_checkpoint.go | 38 +++++--- .../state/state_compatibility_test.go | 10 ++- pkg/kubelet/cm/cpumanager/state/state_file.go | 51 +++++++---- .../cm/cpumanager/state/state_file_test.go | 89 ++++++++----------- 12 files changed, 191 insertions(+), 137 deletions(-) diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 90ad0c1bb49..c4056941aa0 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -579,7 +579,10 @@ func (cm *containerManagerImpl) Start(node *v1.Node, if err != nil { return fmt.Errorf("failed to build map of initial containers from runtime: %v", err) } - cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) + err = cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) + if err != nil { + return fmt.Errorf("start cpu manager error: %v", err) + } } // cache the node Info including resource capacity and diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index ec9ae4333be..65231747493 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -53,7 +53,7 @@ const cpuManagerStateFileName = "cpu_manager_state" // Manager interface provides methods for Kubelet to manage pod cpus. type Manager interface { // Start is called during Kubelet initialization. - Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) + Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error // AddContainer is called between container create and container start // so that initial CPU affinity settings can be written through to the @@ -155,7 +155,10 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo // exclusively allocated. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) - policy = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity) + policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity) + if err != nil { + return nil, fmt.Errorf("new static policy error: %v", err) + } default: return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName) @@ -173,7 +176,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo return manager, nil } -func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) { +func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error { klog.Infof("[cpumanager] starting with %s policy", m.policy.Name()) klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod) m.sourcesReady = sourcesReady @@ -183,16 +186,22 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), initialContainers) if err != nil { - klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v\n", err) - panic("[cpumanager] - please drain node and remove policy state file") + klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err) + return err } m.state = stateImpl - m.policy.Start(m.state) + err = m.policy.Start(m.state) + if err != nil { + klog.Errorf("[cpumanager] policy start error: %v", err) + return err + } + if m.policy.Name() == string(PolicyNone) { - return + return nil } go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop) + return nil } func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index c1c494583dc..84a094a17f3 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -100,7 +100,8 @@ func (p *mockPolicy) Name() string { return "mock" } -func (p *mockPolicy) Start(s state.State) { +func (p *mockPolicy) Start(s state.State) error { + return p.err } func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { @@ -206,7 +207,7 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) } func TestCPUManagerAdd(t *testing.T) { - testPolicy := NewStaticPolicy( + testPolicy, _ := NewStaticPolicy( &topology.CPUTopology{ NumCPUs: 4, NumSockets: 1, @@ -464,7 +465,7 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) state := &mockState{ assignments: testCase.stAssignments, @@ -941,7 +942,7 @@ func TestReconcileState(t *testing.T) { // above test cases are without kubelet --reserved-cpus cmd option // the following tests are with --reserved-cpus configured func TestCPUManagerAddWithResvList(t *testing.T) { - testPolicy := NewStaticPolicy( + testPolicy, _ := NewStaticPolicy( &topology.CPUTopology{ NumCPUs: 4, NumSockets: 1, diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 7adfc2ba59e..d3b3eca6c1c 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -30,8 +30,9 @@ type fakeManager struct { state state.State } -func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) { +func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error { klog.Info("[fake cpumanager] Start()") + return nil } func (m *fakeManager) Policy() Policy { diff --git a/pkg/kubelet/cm/cpumanager/policy.go b/pkg/kubelet/cm/cpumanager/policy.go index aead99ba5c4..63e031b4a95 100644 --- a/pkg/kubelet/cm/cpumanager/policy.go +++ b/pkg/kubelet/cm/cpumanager/policy.go @@ -25,7 +25,7 @@ import ( // Policy implements logic for pod container to CPU assignment. type Policy interface { Name() string - Start(s state.State) + Start(s state.State) error // AddContainer call is idempotent AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error // RemoveContainer call is idempotent diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go index 8f769f62374..be46369836b 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none.go +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -39,8 +39,9 @@ func (p *nonePolicy) Name() string { return string(PolicyNone) } -func (p *nonePolicy) Start(s state.State) { +func (p *nonePolicy) Start(s state.State) error { klog.Info("[cpumanager] none policy: Start") + return nil } func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 61fe3cd8a63..8502c1f2f0b 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -85,7 +85,7 @@ var _ Policy = &staticPolicy{} // NewStaticPolicy returns a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main // container process starts. -func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) Policy { +func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) (Policy, error) { allCPUs := topology.CPUDetails.CPUs() var reserved cpuset.CPUSet if reservedCPUs.Size() > 0 { @@ -100,7 +100,8 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv } if reserved.Size() != numReservedCPUs { - panic(fmt.Sprintf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)) + err := fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs) + return nil, err } klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved) @@ -109,18 +110,19 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv topology: topology, reserved: reserved, affinity: affinity, - } + }, nil } func (p *staticPolicy) Name() string { return string(PolicyStatic) } -func (p *staticPolicy) Start(s state.State) { +func (p *staticPolicy) Start(s state.State) error { if err := p.validateState(s); err != nil { - klog.Errorf("[cpumanager] static policy invalid state: %s\n", err.Error()) - panic("[cpumanager] - please drain node and remove policy state file") + klog.Errorf("[cpumanager] static policy invalid state: %v, please drain node and remove policy state file", err) + return err } + return nil } func (p *staticPolicy) validateState(s state.State) error { diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index f43c52a0a90..9e67692b459 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -41,11 +41,10 @@ type staticPolicyTest struct { expErr error expCPUAlloc bool expCSet cpuset.CPUSet - expPanic bool } func TestStaticPolicyName(t *testing.T) { - policy := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) policyName := policy.Name() if policyName != "static" { @@ -81,7 +80,7 @@ func TestStaticPolicyStart(t *testing.T) { numReservedCPUs: 2, stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1), - expPanic: true, + expErr: fmt.Errorf("not all reserved cpus: \"0,6\" are present in defaultCpuSet: \"0-1\""), }, { description: "assigned core 2 is still present in available cpuset", @@ -92,7 +91,7 @@ func TestStaticPolicyStart(t *testing.T) { }, }, stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11), - expPanic: true, + expErr: fmt.Errorf("pod: fakePod, container: 0 cpuset: \"0-2\" overlaps with default cpuset \"2-11\""), }, { description: "core 12 is not present in topology but is in state cpuset", @@ -104,7 +103,7 @@ func TestStaticPolicyStart(t *testing.T) { }, }, stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10, 11, 12), - expPanic: true, + expErr: fmt.Errorf("current set of available CPUs \"0-11\" doesn't match with CPUs in state \"0-12\""), }, { description: "core 11 is present in topology but is not in state cpuset", @@ -116,26 +115,25 @@ func TestStaticPolicyStart(t *testing.T) { }, }, stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10), - expPanic: true, + expErr: fmt.Errorf("current set of available CPUs \"0-11\" doesn't match with CPUs in state \"0-10\""), }, } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - defer func() { - if err := recover(); err != nil { - if !testCase.expPanic { - t.Errorf("unexpected panic occurred: %q", err) - } - } else if testCase.expPanic { - t.Error("expected panic doesn't occurred") - } - }() - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()).(*staticPolicy) + p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy := p.(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, defaultCPUSet: testCase.stDefaultCPUSet, } - policy.Start(st) + err := policy.Start(st) + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", + testCase.description, testCase.expErr, err) + } + if err != nil { + return + } if !testCase.stDefaultCPUSet.IsEmpty() { for cpuid := 1; cpuid < policy.topology.NumCPUs; cpuid++ { @@ -438,7 +436,7 @@ func TestStaticPolicyAdd(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -539,7 +537,7 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -629,12 +627,17 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { }, } for _, tc := range testCases { - policy := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager()).(*staticPolicy) + p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy := p.(*staticPolicy) st := &mockState{ assignments: tc.stAssignments, defaultCPUSet: tc.stDefaultCPUSet, } - policy.Start(st) + err := policy.Start(st) + if err != nil { + t.Errorf("StaticPolicy Start() error (%v)", err) + continue + } cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask) if err != nil { @@ -661,9 +664,9 @@ type staticPolicyTestWithResvList struct { stDefaultCPUSet cpuset.CPUSet pod *v1.Pod expErr error + expNewErr error expCPUAlloc bool expCSet cpuset.CPUSet - expPanic bool } func TestStaticPolicyStartWithResvList(t *testing.T) { @@ -684,7 +687,7 @@ func TestStaticPolicyStartWithResvList(t *testing.T) { reserved: cpuset.NewCPUSet(0, 1), stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5), - expPanic: true, + expErr: fmt.Errorf("not all reserved cpus: \"0-1\" are present in defaultCpuSet: \"2-5\""), }, { description: "inconsistency between numReservedCPUs and reserved", @@ -693,26 +696,32 @@ func TestStaticPolicyStartWithResvList(t *testing.T) { reserved: cpuset.NewCPUSet(0, 1), stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), - expPanic: true, + expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"), }, } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - defer func() { - if err := recover(); err != nil { - if !testCase.expPanic { - t.Errorf("unexpected panic occurred: %q", err) - } - } else if testCase.expPanic { - t.Error("expected panic doesn't occurred") - } - }() - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()).(*staticPolicy) + p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) + if !reflect.DeepEqual(err, testCase.expNewErr) { + t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", + testCase.description, testCase.expNewErr, err) + } + if err != nil { + return + } + policy := p.(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, defaultCPUSet: testCase.stDefaultCPUSet, } - policy.Start(st) + err = policy.Start(st) + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", + testCase.description, testCase.expErr, err) + } + if err != nil { + return + } if !st.GetDefaultCPUSet().Equals(testCase.expCSet) { t.Errorf("State CPUSet is different than expected. Have %q wants: %q", st.GetDefaultCPUSet(), @@ -769,7 +778,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, diff --git a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go index cd8bd994a69..929fa03a0dc 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go +++ b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go @@ -55,8 +55,7 @@ func NewCheckpointState(stateDir, checkpointName, policyName string, initialCont if err := stateCheckpoint.restoreState(); err != nil { //lint:ignore ST1005 user-facing error message - return nil, fmt.Errorf("could not restore state from checkpoint: %v\n"+ - "Please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet.", + return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName)) } @@ -105,8 +104,7 @@ func (sc *stateCheckpoint) restoreState() error { checkpointV1 = &CPUManagerCheckpointV1{} // reset it back to 0 if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil { if err == errors.ErrCheckpointNotFound { - sc.storeState() - return nil + return sc.storeState() } return err } @@ -144,7 +142,7 @@ func (sc *stateCheckpoint) restoreState() error { } // saves state to a checkpoint, caller is responsible for locking -func (sc *stateCheckpoint) storeState() { +func (sc *stateCheckpoint) storeState() error { checkpoint := NewCPUManagerCheckpoint() checkpoint.PolicyName = sc.policyName checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String() @@ -158,10 +156,11 @@ func (sc *stateCheckpoint) storeState() { } err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) - if err != nil { - panic("[cpumanager] could not save checkpoint: " + err.Error()) + klog.Errorf("[cpumanager] could not save checkpoint: %v", err) + return err } + return nil } // GetCPUSet returns current CPU set @@ -202,7 +201,10 @@ func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset c sc.mux.Lock() defer sc.mux.Unlock() sc.cache.SetCPUSet(podUID, containerName, cset) - sc.storeState() + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } // SetDefaultCPUSet sets default CPU set @@ -210,7 +212,10 @@ func (sc *stateCheckpoint) SetDefaultCPUSet(cset cpuset.CPUSet) { sc.mux.Lock() defer sc.mux.Unlock() sc.cache.SetDefaultCPUSet(cset) - sc.storeState() + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } // SetCPUAssignments sets CPU to pod assignments @@ -218,7 +223,10 @@ func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) { sc.mux.Lock() defer sc.mux.Unlock() sc.cache.SetCPUAssignments(a) - sc.storeState() + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } // Delete deletes assignment for specified pod @@ -226,7 +234,10 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) { sc.mux.Lock() defer sc.mux.Unlock() sc.cache.Delete(podUID, containerName) - sc.storeState() + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } // ClearState clears the state and saves it in a checkpoint @@ -234,5 +245,8 @@ func (sc *stateCheckpoint) ClearState() { sc.mux.Lock() defer sc.mux.Unlock() sc.cache.ClearState() - sc.storeState() + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } diff --git a/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go b/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go index a02bf82e4a5..0493d4bb7aa 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go @@ -46,7 +46,10 @@ func TestFileToCheckpointCompatibility(t *testing.T) { // ensure testing state is removed after testing defer os.Remove(statePath) - fileState := NewFileState(statePath, "none", nil) + fileState, err := NewFileState(statePath, "none", nil) + if err != nil { + t.Fatalf("could not create new file state: %v", err) + } fileState.SetDefaultCPUSet(state.defaultCPUSet) fileState.SetCPUAssignments(state.assignments) @@ -76,7 +79,10 @@ func TestCheckpointToFileCompatibility(t *testing.T) { checkpointState.SetDefaultCPUSet(state.defaultCPUSet) checkpointState.SetCPUAssignments(state.assignments) - restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none", nil) + restoredState, err := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none", nil) + if err != nil { + t.Fatalf("could not create new file state: %v", err) + } AssertStateEqual(t, restoredState, state) } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file.go b/pkg/kubelet/cm/cpumanager/state/state_file.go index f3b8e70470d..667f511af90 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file.go @@ -51,7 +51,7 @@ type stateFile struct { } // NewFileState creates new State for keeping track of cpu/pod assignment with file backend -func NewFileState(filePath string, policyName string, initialContainers containermap.ContainerMap) State { +func NewFileState(filePath string, policyName string, initialContainers containermap.ContainerMap) (State, error) { stateFile := &stateFile{ stateFilePath: filePath, cache: NewMemoryState(), @@ -61,13 +61,14 @@ func NewFileState(filePath string, policyName string, initialContainers containe if err := stateFile.tryRestoreState(); err != nil { // could not restore state, init new state file - msg := fmt.Sprintf("[cpumanager] state file: unable to restore state from disk (%s)\n", err.Error()) + - "Panicking because we cannot guarantee sane CPU affinity for existing containers.\n" + - fmt.Sprintf("Please drain this node and delete the CPU manager state file \"%s\" before restarting Kubelet.", stateFile.stateFilePath) - panic(msg) + klog.Errorf("[cpumanager] state file: unable to restore state from disk (%v)"+ + " We cannot guarantee sane CPU affinity for existing containers."+ + " Please drain this node and delete the CPU manager state file \"%s\" before restarting Kubelet.", + err, stateFile.stateFilePath) + return nil, err } - return stateFile + return stateFile, nil } // migrateV1StateToV2State() converts state from the v1 format to the v2 format @@ -112,7 +113,10 @@ func (sf *stateFile) tryRestoreState() error { // If the state file does not exist or has zero length, write a new file. if os.IsNotExist(err) || len(content) == 0 { - sf.storeState() + err := sf.storeState() + if err != nil { + return err + } klog.Infof("[cpumanager] state file: created new state file \"%s\"", sf.stateFilePath) return nil } @@ -169,7 +173,7 @@ func (sf *stateFile) tryRestoreState() error { } // saves state to a file, caller is responsible for locking -func (sf *stateFile) storeState() { +func (sf *stateFile) storeState() error { var content []byte var err error @@ -188,12 +192,14 @@ func (sf *stateFile) storeState() { } if content, err = json.Marshal(data); err != nil { - panic("[cpumanager] state file: could not serialize state to json") + return fmt.Errorf("[cpumanager] state file: could not serialize state to json") } if err = ioutil.WriteFile(sf.stateFilePath, content, 0644); err != nil { - panic("[cpumanager] state file not written") + return fmt.Errorf("[cpumanager] state file not written") } + + return nil } func (sf *stateFile) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { @@ -228,33 +234,48 @@ func (sf *stateFile) SetCPUSet(podUID string, containerName string, cset cpuset. sf.Lock() defer sf.Unlock() sf.cache.SetCPUSet(podUID, containerName, cset) - sf.storeState() + err := sf.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } func (sf *stateFile) SetDefaultCPUSet(cset cpuset.CPUSet) { sf.Lock() defer sf.Unlock() sf.cache.SetDefaultCPUSet(cset) - sf.storeState() + err := sf.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) { sf.Lock() defer sf.Unlock() sf.cache.SetCPUAssignments(a) - sf.storeState() + err := sf.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } func (sf *stateFile) Delete(podUID string, containerName string) { sf.Lock() defer sf.Unlock() sf.cache.Delete(podUID, containerName) - sf.storeState() + err := sf.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } func (sf *stateFile) ClearState() { sf.Lock() defer sf.Unlock() sf.cache.ClearState() - sf.storeState() + err := sf.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file_test.go b/pkg/kubelet/cm/cpumanager/state/state_file_test.go index a928e9529a9..a0eae9f1071 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file_test.go @@ -76,7 +76,6 @@ func TestFileStateTryRestore(t *testing.T) { policyName string initialContainers containermap.ContainerMap expErr string - expPanic bool expectedState *stateMemory }{ { @@ -85,7 +84,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", - true, &stateMemory{}, }, { @@ -94,7 +92,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", - true, &stateMemory{}, }, { @@ -103,7 +100,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, "", - false, &stateMemory{ assignments: ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), @@ -115,7 +111,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`, - true, &stateMemory{}, }, { @@ -132,7 +127,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, "", - false, &stateMemory{ assignments: ContainerCPUAssignments{ "pod": map[string]cpuset.CPUSet{ @@ -153,7 +147,6 @@ func TestFileStateTryRestore(t *testing.T) { "B", containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`, - true, &stateMemory{}, }, { @@ -162,7 +155,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)", - true, &stateMemory{}, }, { @@ -180,7 +172,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, "", - false, &stateMemory{ assignments: ContainerCPUAssignments{ "pod": map[string]cpuset.CPUSet{ @@ -200,7 +191,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`, - true, &stateMemory{}, }, { @@ -218,7 +208,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`, - true, &stateMemory{}, }, { @@ -227,7 +216,6 @@ func TestFileStateTryRestore(t *testing.T) { "none", containermap.ContainerMap{}, "", - false, &stateMemory{ assignments: ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), @@ -251,7 +239,6 @@ func TestFileStateTryRestore(t *testing.T) { return cm }(), "", - false, &stateMemory{ assignments: ContainerCPUAssignments{ "pod": map[string]cpuset.CPUSet{ @@ -266,18 +253,6 @@ func TestFileStateTryRestore(t *testing.T) { for idx, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - defer func() { - if tc.expPanic { - r := recover() - panicMsg := r.(string) - if !strings.HasPrefix(panicMsg, tc.expErr) { - t.Fatalf(`expected panic "%s" but got "%s"`, tc.expErr, panicMsg) - } else { - t.Logf(`got expected panic "%s"`, panicMsg) - } - } - }() - sfilePath, err := ioutil.TempFile("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx)) if err != nil { t.Errorf("cannot create temporary file: %q", err.Error()) @@ -291,7 +266,8 @@ func TestFileStateTryRestore(t *testing.T) { defer os.Remove(sfilePath.Name()) logData, fileState := stderrCapture(t, func() State { - return NewFileState(sfilePath.Name(), tc.policyName, tc.initialContainers) + newFileState, _ := NewFileState(sfilePath.Name(), tc.policyName, tc.initialContainers) + return newFileState }) if tc.expErr != "" { @@ -306,38 +282,36 @@ func TestFileStateTryRestore(t *testing.T) { } } + if fileState == nil { + return + } + AssertStateEqual(t, fileState, tc.expectedState) }) } } -func TestFileStateTryRestorePanic(t *testing.T) { +func TestFileStateTryRestoreError(t *testing.T) { - testCase := struct { - description string - wantPanic bool - panicMessage string + testCases := []struct { + description string + expErr error }{ - "Panic creating file", - true, - "[cpumanager] state file not written", + { + " create file error", + fmt.Errorf("[cpumanager] state file not written"), + }, } - t.Run(testCase.description, func(t *testing.T) { - sfilePath := path.Join("/invalid_path/to_some_dir", "cpumanager_state_file_test") - defer func() { - if err := recover(); err != nil { - if testCase.wantPanic { - if testCase.panicMessage == err { - t.Logf("tryRestoreState() got expected panic = %v", err) - return - } - t.Errorf("tryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage) - } + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + sfilePath := path.Join("/invalid_path/to_some_dir", "cpumanager_state_file_test") + _, err := NewFileState(sfilePath, "static", nil) + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("unexpected error, expected: %s, got: %s", testCase.expErr, err) } - }() - NewFileState(sfilePath, "static", nil) - }) + }) + } } func TestUpdateStateFile(t *testing.T) { @@ -417,7 +391,11 @@ func TestUpdateStateFile(t *testing.T) { return } } - newFileState := NewFileState(sfilePath.Name(), "static", nil) + newFileState, err := NewFileState(sfilePath.Name(), "static", nil) + if err != nil { + t.Errorf("NewFileState() error: %v", err) + return + } AssertStateEqual(t, newFileState, tc.expectedState) }) } @@ -476,7 +454,12 @@ func TestHelpersStateFile(t *testing.T) { t.Errorf("cannot create temporary test file: %q", err.Error()) } - state := NewFileState(sfFile.Name(), "static", nil) + state, err := NewFileState(sfFile.Name(), "static", nil) + if err != nil { + t.Errorf("new file state error: %v", err) + return + } + state.SetDefaultCPUSet(tc.defaultCPUset) for podUID := range tc.assignments { @@ -523,7 +506,11 @@ func TestClearStateStateFile(t *testing.T) { t.Errorf("cannot create temporary test file: %q", err.Error()) } - state := NewFileState(sfFile.Name(), "static", nil) + state, err := NewFileState(sfFile.Name(), "static", nil) + if err != nil { + t.Errorf("new file state error: %v", err) + return + } state.SetDefaultCPUSet(testCase.defaultCPUset) for podUID := range testCase.assignments { for containerName, containerCPUs := range testCase.assignments[podUID] {