Merge pull request #84705 from whypro/cpumanager-panic-master

Return error instead of panic when cpu manager fails on startup.
This commit is contained in:
Kubernetes Prow Robot 2020-01-20 07:25:37 -08:00 committed by GitHub
commit 23fa359d6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 191 additions and 137 deletions

View File

@ -579,7 +579,10 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
if err != nil { if err != nil {
return fmt.Errorf("failed to build map of initial containers from runtime: %v", err) return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
} }
cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) err = cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}
} }
// cache the node Info including resource capacity and // cache the node Info including resource capacity and

View File

@ -53,7 +53,7 @@ const cpuManagerStateFileName = "cpu_manager_state"
// Manager interface provides methods for Kubelet to manage pod cpus. // Manager interface provides methods for Kubelet to manage pod cpus.
type Manager interface { type Manager interface {
// Start is called during Kubelet initialization. // Start is called during Kubelet initialization.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
// AddContainer is called between container create and container start // AddContainer is called between container create and container start
// so that initial CPU affinity settings can be written through to the // so that initial CPU affinity settings can be written through to the
@ -155,7 +155,10 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
// exclusively allocated. // exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity) policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
if err != nil {
return nil, fmt.Errorf("new static policy error: %v", err)
}
default: default:
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName) return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
@ -173,7 +176,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
return manager, nil return manager, nil
} }
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) { func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.Infof("[cpumanager] starting with %s policy", m.policy.Name()) klog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod) klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
m.sourcesReady = sourcesReady m.sourcesReady = sourcesReady
@ -183,16 +186,22 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), initialContainers) stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), initialContainers)
if err != nil { if err != nil {
klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v\n", err) klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err)
panic("[cpumanager] - please drain node and remove policy state file") return err
} }
m.state = stateImpl m.state = stateImpl
m.policy.Start(m.state) err = m.policy.Start(m.state)
if err != nil {
klog.Errorf("[cpumanager] policy start error: %v", err)
return err
}
if m.policy.Name() == string(PolicyNone) { if m.policy.Name() == string(PolicyNone) {
return return nil
} }
go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop) go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
return nil
} }
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {

View File

@ -100,7 +100,8 @@ func (p *mockPolicy) Name() string {
return "mock" return "mock"
} }
func (p *mockPolicy) Start(s state.State) { func (p *mockPolicy) Start(s state.State) error {
return p.err
} }
func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
@ -206,7 +207,7 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string })
} }
func TestCPUManagerAdd(t *testing.T) { func TestCPUManagerAdd(t *testing.T) {
testPolicy := NewStaticPolicy( testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{ &topology.CPUTopology{
NumCPUs: 4, NumCPUs: 4,
NumSockets: 1, NumSockets: 1,
@ -464,7 +465,7 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
} }
for _, testCase := range testCases { for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
state := &mockState{ state := &mockState{
assignments: testCase.stAssignments, assignments: testCase.stAssignments,
@ -941,7 +942,7 @@ func TestReconcileState(t *testing.T) {
// above test cases are without kubelet --reserved-cpus cmd option // above test cases are without kubelet --reserved-cpus cmd option
// the following tests are with --reserved-cpus configured // the following tests are with --reserved-cpus configured
func TestCPUManagerAddWithResvList(t *testing.T) { func TestCPUManagerAddWithResvList(t *testing.T) {
testPolicy := NewStaticPolicy( testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{ &topology.CPUTopology{
NumCPUs: 4, NumCPUs: 4,
NumSockets: 1, NumSockets: 1,

View File

@ -30,8 +30,9 @@ type fakeManager struct {
state state.State state state.State
} }
func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) { func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.Info("[fake cpumanager] Start()") klog.Info("[fake cpumanager] Start()")
return nil
} }
func (m *fakeManager) Policy() Policy { func (m *fakeManager) Policy() Policy {

View File

@ -25,7 +25,7 @@ import (
// Policy implements logic for pod container to CPU assignment. // Policy implements logic for pod container to CPU assignment.
type Policy interface { type Policy interface {
Name() string Name() string
Start(s state.State) Start(s state.State) error
// AddContainer call is idempotent // AddContainer call is idempotent
AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error
// RemoveContainer call is idempotent // RemoveContainer call is idempotent

View File

@ -39,8 +39,9 @@ func (p *nonePolicy) Name() string {
return string(PolicyNone) return string(PolicyNone)
} }
func (p *nonePolicy) Start(s state.State) { func (p *nonePolicy) Start(s state.State) error {
klog.Info("[cpumanager] none policy: Start") klog.Info("[cpumanager] none policy: Start")
return nil
} }
func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {

View File

@ -85,7 +85,7 @@ var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU // NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main // assignments for exclusively pinned guaranteed containers after the main
// container process starts. // container process starts.
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) Policy { func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) (Policy, error) {
allCPUs := topology.CPUDetails.CPUs() allCPUs := topology.CPUDetails.CPUs()
var reserved cpuset.CPUSet var reserved cpuset.CPUSet
if reservedCPUs.Size() > 0 { if reservedCPUs.Size() > 0 {
@ -100,7 +100,8 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
} }
if reserved.Size() != numReservedCPUs { if reserved.Size() != numReservedCPUs {
panic(fmt.Sprintf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)) err := fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)
return nil, err
} }
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved) klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
@ -109,18 +110,19 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
topology: topology, topology: topology,
reserved: reserved, reserved: reserved,
affinity: affinity, affinity: affinity,
} }, nil
} }
func (p *staticPolicy) Name() string { func (p *staticPolicy) Name() string {
return string(PolicyStatic) return string(PolicyStatic)
} }
func (p *staticPolicy) Start(s state.State) { func (p *staticPolicy) Start(s state.State) error {
if err := p.validateState(s); err != nil { if err := p.validateState(s); err != nil {
klog.Errorf("[cpumanager] static policy invalid state: %s\n", err.Error()) klog.Errorf("[cpumanager] static policy invalid state: %v, please drain node and remove policy state file", err)
panic("[cpumanager] - please drain node and remove policy state file") return err
} }
return nil
} }
func (p *staticPolicy) validateState(s state.State) error { func (p *staticPolicy) validateState(s state.State) error {

View File

@ -41,11 +41,10 @@ type staticPolicyTest struct {
expErr error expErr error
expCPUAlloc bool expCPUAlloc bool
expCSet cpuset.CPUSet expCSet cpuset.CPUSet
expPanic bool
} }
func TestStaticPolicyName(t *testing.T) { func TestStaticPolicyName(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
policyName := policy.Name() policyName := policy.Name()
if policyName != "static" { if policyName != "static" {
@ -81,7 +80,7 @@ func TestStaticPolicyStart(t *testing.T) {
numReservedCPUs: 2, numReservedCPUs: 2,
stAssignments: state.ContainerCPUAssignments{}, stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1), stDefaultCPUSet: cpuset.NewCPUSet(0, 1),
expPanic: true, expErr: fmt.Errorf("not all reserved cpus: \"0,6\" are present in defaultCpuSet: \"0-1\""),
}, },
{ {
description: "assigned core 2 is still present in available cpuset", description: "assigned core 2 is still present in available cpuset",
@ -92,7 +91,7 @@ func TestStaticPolicyStart(t *testing.T) {
}, },
}, },
stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11), stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
expPanic: true, expErr: fmt.Errorf("pod: fakePod, container: 0 cpuset: \"0-2\" overlaps with default cpuset \"2-11\""),
}, },
{ {
description: "core 12 is not present in topology but is in state cpuset", description: "core 12 is not present in topology but is in state cpuset",
@ -104,7 +103,7 @@ func TestStaticPolicyStart(t *testing.T) {
}, },
}, },
stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10, 11, 12), stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10, 11, 12),
expPanic: true, expErr: fmt.Errorf("current set of available CPUs \"0-11\" doesn't match with CPUs in state \"0-12\""),
}, },
{ {
description: "core 11 is present in topology but is not in state cpuset", description: "core 11 is present in topology but is not in state cpuset",
@ -116,26 +115,25 @@ func TestStaticPolicyStart(t *testing.T) {
}, },
}, },
stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10), stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10),
expPanic: true, expErr: fmt.Errorf("current set of available CPUs \"0-11\" doesn't match with CPUs in state \"0-10\""),
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) { t.Run(testCase.description, func(t *testing.T) {
defer func() { p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
if err := recover(); err != nil { policy := p.(*staticPolicy)
if !testCase.expPanic {
t.Errorf("unexpected panic occurred: %q", err)
}
} else if testCase.expPanic {
t.Error("expected panic doesn't occurred")
}
}()
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()).(*staticPolicy)
st := &mockState{ st := &mockState{
assignments: testCase.stAssignments, assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet, defaultCPUSet: testCase.stDefaultCPUSet,
} }
policy.Start(st) err := policy.Start(st)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if err != nil {
return
}
if !testCase.stDefaultCPUSet.IsEmpty() { if !testCase.stDefaultCPUSet.IsEmpty() {
for cpuid := 1; cpuid < policy.topology.NumCPUs; cpuid++ { for cpuid := 1; cpuid < policy.topology.NumCPUs; cpuid++ {
@ -438,7 +436,7 @@ func TestStaticPolicyAdd(t *testing.T) {
} }
for _, testCase := range testCases { for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
st := &mockState{ st := &mockState{
assignments: testCase.stAssignments, assignments: testCase.stAssignments,
@ -539,7 +537,7 @@ func TestStaticPolicyRemove(t *testing.T) {
} }
for _, testCase := range testCases { for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
st := &mockState{ st := &mockState{
assignments: testCase.stAssignments, assignments: testCase.stAssignments,
@ -629,12 +627,17 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
policy := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager()).(*staticPolicy) p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
policy := p.(*staticPolicy)
st := &mockState{ st := &mockState{
assignments: tc.stAssignments, assignments: tc.stAssignments,
defaultCPUSet: tc.stDefaultCPUSet, defaultCPUSet: tc.stDefaultCPUSet,
} }
policy.Start(st) err := policy.Start(st)
if err != nil {
t.Errorf("StaticPolicy Start() error (%v)", err)
continue
}
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask) cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask)
if err != nil { if err != nil {
@ -661,9 +664,9 @@ type staticPolicyTestWithResvList struct {
stDefaultCPUSet cpuset.CPUSet stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod pod *v1.Pod
expErr error expErr error
expNewErr error
expCPUAlloc bool expCPUAlloc bool
expCSet cpuset.CPUSet expCSet cpuset.CPUSet
expPanic bool
} }
func TestStaticPolicyStartWithResvList(t *testing.T) { func TestStaticPolicyStartWithResvList(t *testing.T) {
@ -684,7 +687,7 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
reserved: cpuset.NewCPUSet(0, 1), reserved: cpuset.NewCPUSet(0, 1),
stAssignments: state.ContainerCPUAssignments{}, stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5), stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5),
expPanic: true, expErr: fmt.Errorf("not all reserved cpus: \"0-1\" are present in defaultCpuSet: \"2-5\""),
}, },
{ {
description: "inconsistency between numReservedCPUs and reserved", description: "inconsistency between numReservedCPUs and reserved",
@ -693,26 +696,32 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
reserved: cpuset.NewCPUSet(0, 1), reserved: cpuset.NewCPUSet(0, 1),
stAssignments: state.ContainerCPUAssignments{}, stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
expPanic: true, expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) { t.Run(testCase.description, func(t *testing.T) {
defer func() { p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager())
if err := recover(); err != nil { if !reflect.DeepEqual(err, testCase.expNewErr) {
if !testCase.expPanic { t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
t.Errorf("unexpected panic occurred: %q", err) testCase.description, testCase.expNewErr, err)
} }
} else if testCase.expPanic { if err != nil {
t.Error("expected panic doesn't occurred") return
} }
}() policy := p.(*staticPolicy)
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()).(*staticPolicy)
st := &mockState{ st := &mockState{
assignments: testCase.stAssignments, assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet, defaultCPUSet: testCase.stDefaultCPUSet,
} }
policy.Start(st) err = policy.Start(st)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if err != nil {
return
}
if !st.GetDefaultCPUSet().Equals(testCase.expCSet) { if !st.GetDefaultCPUSet().Equals(testCase.expCSet) {
t.Errorf("State CPUSet is different than expected. Have %q wants: %q", st.GetDefaultCPUSet(), t.Errorf("State CPUSet is different than expected. Have %q wants: %q", st.GetDefaultCPUSet(),
@ -769,7 +778,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
} }
for _, testCase := range testCases { for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager())
st := &mockState{ st := &mockState{
assignments: testCase.stAssignments, assignments: testCase.stAssignments,

View File

@ -55,8 +55,7 @@ func NewCheckpointState(stateDir, checkpointName, policyName string, initialCont
if err := stateCheckpoint.restoreState(); err != nil { if err := stateCheckpoint.restoreState(); err != nil {
//lint:ignore ST1005 user-facing error message //lint:ignore ST1005 user-facing error message
return nil, fmt.Errorf("could not restore state from checkpoint: %v\n"+ return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet",
"Please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet.",
err, path.Join(stateDir, checkpointName)) err, path.Join(stateDir, checkpointName))
} }
@ -100,8 +99,7 @@ func (sc *stateCheckpoint) restoreState() error {
checkpointV1 = &CPUManagerCheckpointV1{} // reset it back to 0 checkpointV1 = &CPUManagerCheckpointV1{} // reset it back to 0
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil { if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil {
if err == errors.ErrCheckpointNotFound { if err == errors.ErrCheckpointNotFound {
sc.storeState() return sc.storeState()
return nil
} }
return err return err
} }
@ -142,7 +140,7 @@ func (sc *stateCheckpoint) restoreState() error {
} }
// saves state to a checkpoint, caller is responsible for locking // saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) storeState() { func (sc *stateCheckpoint) storeState() error {
checkpoint := NewCPUManagerCheckpoint() checkpoint := NewCPUManagerCheckpoint()
checkpoint.PolicyName = sc.policyName checkpoint.PolicyName = sc.policyName
checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String() checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String()
@ -156,10 +154,11 @@ func (sc *stateCheckpoint) storeState() {
} }
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil { if err != nil {
panic("[cpumanager] could not save checkpoint: " + err.Error()) klog.Errorf("[cpumanager] could not save checkpoint: %v", err)
return err
} }
return nil
} }
// GetCPUSet returns current CPU set // GetCPUSet returns current CPU set
@ -200,7 +199,10 @@ func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset c
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.SetCPUSet(podUID, containerName, cset) sc.cache.SetCPUSet(podUID, containerName, cset)
sc.storeState() err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }
// SetDefaultCPUSet sets default CPU set // SetDefaultCPUSet sets default CPU set
@ -208,7 +210,10 @@ func (sc *stateCheckpoint) SetDefaultCPUSet(cset cpuset.CPUSet) {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.SetDefaultCPUSet(cset) sc.cache.SetDefaultCPUSet(cset)
sc.storeState() err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }
// SetCPUAssignments sets CPU to pod assignments // SetCPUAssignments sets CPU to pod assignments
@ -216,7 +221,10 @@ func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.SetCPUAssignments(a) sc.cache.SetCPUAssignments(a)
sc.storeState() err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }
// Delete deletes assignment for specified pod // Delete deletes assignment for specified pod
@ -224,7 +232,10 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.Delete(podUID, containerName) sc.cache.Delete(podUID, containerName)
sc.storeState() err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }
// ClearState clears the state and saves it in a checkpoint // ClearState clears the state and saves it in a checkpoint
@ -232,5 +243,8 @@ func (sc *stateCheckpoint) ClearState() {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.ClearState() sc.cache.ClearState()
sc.storeState() err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }

View File

@ -46,7 +46,10 @@ func TestFileToCheckpointCompatibility(t *testing.T) {
// ensure testing state is removed after testing // ensure testing state is removed after testing
defer os.Remove(statePath) defer os.Remove(statePath)
fileState := NewFileState(statePath, "none", nil) fileState, err := NewFileState(statePath, "none", nil)
if err != nil {
t.Fatalf("could not create new file state: %v", err)
}
fileState.SetDefaultCPUSet(state.defaultCPUSet) fileState.SetDefaultCPUSet(state.defaultCPUSet)
fileState.SetCPUAssignments(state.assignments) fileState.SetCPUAssignments(state.assignments)
@ -76,7 +79,10 @@ func TestCheckpointToFileCompatibility(t *testing.T) {
checkpointState.SetDefaultCPUSet(state.defaultCPUSet) checkpointState.SetDefaultCPUSet(state.defaultCPUSet)
checkpointState.SetCPUAssignments(state.assignments) checkpointState.SetCPUAssignments(state.assignments)
restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none", nil) restoredState, err := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none", nil)
if err != nil {
t.Fatalf("could not create new file state: %v", err)
}
AssertStateEqual(t, restoredState, state) AssertStateEqual(t, restoredState, state)
} }

View File

@ -51,7 +51,7 @@ type stateFile struct {
} }
// NewFileState creates new State for keeping track of cpu/pod assignment with file backend // NewFileState creates new State for keeping track of cpu/pod assignment with file backend
func NewFileState(filePath string, policyName string, initialContainers containermap.ContainerMap) State { func NewFileState(filePath string, policyName string, initialContainers containermap.ContainerMap) (State, error) {
stateFile := &stateFile{ stateFile := &stateFile{
stateFilePath: filePath, stateFilePath: filePath,
cache: NewMemoryState(), cache: NewMemoryState(),
@ -61,13 +61,14 @@ func NewFileState(filePath string, policyName string, initialContainers containe
if err := stateFile.tryRestoreState(); err != nil { if err := stateFile.tryRestoreState(); err != nil {
// could not restore state, init new state file // could not restore state, init new state file
msg := fmt.Sprintf("[cpumanager] state file: unable to restore state from disk (%s)\n", err.Error()) + klog.Errorf("[cpumanager] state file: unable to restore state from disk (%v)"+
"Panicking because we cannot guarantee sane CPU affinity for existing containers.\n" + " We cannot guarantee sane CPU affinity for existing containers."+
fmt.Sprintf("Please drain this node and delete the CPU manager state file \"%s\" before restarting Kubelet.", stateFile.stateFilePath) " Please drain this node and delete the CPU manager state file \"%s\" before restarting Kubelet.",
panic(msg) err, stateFile.stateFilePath)
return nil, err
} }
return stateFile return stateFile, nil
} }
// migrateV1StateToV2State() converts state from the v1 format to the v2 format // migrateV1StateToV2State() converts state from the v1 format to the v2 format
@ -106,7 +107,10 @@ func (sf *stateFile) tryRestoreState() error {
// If the state file does not exist or has zero length, write a new file. // If the state file does not exist or has zero length, write a new file.
if os.IsNotExist(err) || len(content) == 0 { if os.IsNotExist(err) || len(content) == 0 {
sf.storeState() err := sf.storeState()
if err != nil {
return err
}
klog.Infof("[cpumanager] state file: created new state file \"%s\"", sf.stateFilePath) klog.Infof("[cpumanager] state file: created new state file \"%s\"", sf.stateFilePath)
return nil return nil
} }
@ -166,7 +170,7 @@ func (sf *stateFile) tryRestoreState() error {
} }
// saves state to a file, caller is responsible for locking // saves state to a file, caller is responsible for locking
func (sf *stateFile) storeState() { func (sf *stateFile) storeState() error {
var content []byte var content []byte
var err error var err error
@ -185,12 +189,14 @@ func (sf *stateFile) storeState() {
} }
if content, err = json.Marshal(data); err != nil { if content, err = json.Marshal(data); err != nil {
panic("[cpumanager] state file: could not serialize state to json") return fmt.Errorf("[cpumanager] state file: could not serialize state to json")
} }
if err = ioutil.WriteFile(sf.stateFilePath, content, 0644); err != nil { if err = ioutil.WriteFile(sf.stateFilePath, content, 0644); err != nil {
panic("[cpumanager] state file not written") return fmt.Errorf("[cpumanager] state file not written")
} }
return nil
} }
func (sf *stateFile) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { func (sf *stateFile) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
@ -225,33 +231,48 @@ func (sf *stateFile) SetCPUSet(podUID string, containerName string, cset cpuset.
sf.Lock() sf.Lock()
defer sf.Unlock() defer sf.Unlock()
sf.cache.SetCPUSet(podUID, containerName, cset) sf.cache.SetCPUSet(podUID, containerName, cset)
sf.storeState() err := sf.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }
func (sf *stateFile) SetDefaultCPUSet(cset cpuset.CPUSet) { func (sf *stateFile) SetDefaultCPUSet(cset cpuset.CPUSet) {
sf.Lock() sf.Lock()
defer sf.Unlock() defer sf.Unlock()
sf.cache.SetDefaultCPUSet(cset) sf.cache.SetDefaultCPUSet(cset)
sf.storeState() err := sf.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }
func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) { func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) {
sf.Lock() sf.Lock()
defer sf.Unlock() defer sf.Unlock()
sf.cache.SetCPUAssignments(a) sf.cache.SetCPUAssignments(a)
sf.storeState() err := sf.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }
func (sf *stateFile) Delete(podUID string, containerName string) { func (sf *stateFile) Delete(podUID string, containerName string) {
sf.Lock() sf.Lock()
defer sf.Unlock() defer sf.Unlock()
sf.cache.Delete(podUID, containerName) sf.cache.Delete(podUID, containerName)
sf.storeState() err := sf.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }
func (sf *stateFile) ClearState() { func (sf *stateFile) ClearState() {
sf.Lock() sf.Lock()
defer sf.Unlock() defer sf.Unlock()
sf.cache.ClearState() sf.cache.ClearState()
sf.storeState() err := sf.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
} }

View File

@ -76,7 +76,6 @@ func TestFileStateTryRestore(t *testing.T) {
policyName string policyName string
initialContainers containermap.ContainerMap initialContainers containermap.ContainerMap
expErr string expErr string
expPanic bool
expectedState *stateMemory expectedState *stateMemory
}{ }{
{ {
@ -85,7 +84,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)",
true,
&stateMemory{}, &stateMemory{},
}, },
{ {
@ -94,7 +92,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)",
true,
&stateMemory{}, &stateMemory{},
}, },
{ {
@ -103,7 +100,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
"", "",
false,
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{}, assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), defaultCPUSet: cpuset.NewCPUSet(4, 5, 6),
@ -115,7 +111,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`, `[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`,
true,
&stateMemory{}, &stateMemory{},
}, },
{ {
@ -132,7 +127,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
"", "",
false,
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{ assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{ "pod": map[string]cpuset.CPUSet{
@ -153,7 +147,6 @@ func TestFileStateTryRestore(t *testing.T) {
"B", "B",
containermap.ContainerMap{}, containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`, `[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`,
true,
&stateMemory{}, &stateMemory{},
}, },
{ {
@ -162,7 +155,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)", "[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)",
true,
&stateMemory{}, &stateMemory{},
}, },
{ {
@ -180,7 +172,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
"", "",
false,
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{ assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{ "pod": map[string]cpuset.CPUSet{
@ -200,7 +191,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`,
true,
&stateMemory{}, &stateMemory{},
}, },
{ {
@ -218,7 +208,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`,
true,
&stateMemory{}, &stateMemory{},
}, },
{ {
@ -227,7 +216,6 @@ func TestFileStateTryRestore(t *testing.T) {
"none", "none",
containermap.ContainerMap{}, containermap.ContainerMap{},
"", "",
false,
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{}, assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(), defaultCPUSet: cpuset.NewCPUSet(),
@ -251,7 +239,6 @@ func TestFileStateTryRestore(t *testing.T) {
return cm return cm
}(), }(),
"", "",
false,
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{ assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{ "pod": map[string]cpuset.CPUSet{
@ -266,18 +253,6 @@ func TestFileStateTryRestore(t *testing.T) {
for idx, tc := range testCases { for idx, tc := range testCases {
t.Run(tc.description, func(t *testing.T) { t.Run(tc.description, func(t *testing.T) {
defer func() {
if tc.expPanic {
r := recover()
panicMsg := r.(string)
if !strings.HasPrefix(panicMsg, tc.expErr) {
t.Fatalf(`expected panic "%s" but got "%s"`, tc.expErr, panicMsg)
} else {
t.Logf(`got expected panic "%s"`, panicMsg)
}
}
}()
sfilePath, err := ioutil.TempFile("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx)) sfilePath, err := ioutil.TempFile("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx))
if err != nil { if err != nil {
t.Errorf("cannot create temporary file: %q", err.Error()) t.Errorf("cannot create temporary file: %q", err.Error())
@ -291,7 +266,8 @@ func TestFileStateTryRestore(t *testing.T) {
defer os.Remove(sfilePath.Name()) defer os.Remove(sfilePath.Name())
logData, fileState := stderrCapture(t, func() State { logData, fileState := stderrCapture(t, func() State {
return NewFileState(sfilePath.Name(), tc.policyName, tc.initialContainers) newFileState, _ := NewFileState(sfilePath.Name(), tc.policyName, tc.initialContainers)
return newFileState
}) })
if tc.expErr != "" { if tc.expErr != "" {
@ -306,38 +282,36 @@ func TestFileStateTryRestore(t *testing.T) {
} }
} }
if fileState == nil {
return
}
AssertStateEqual(t, fileState, tc.expectedState) AssertStateEqual(t, fileState, tc.expectedState)
}) })
} }
} }
func TestFileStateTryRestorePanic(t *testing.T) { func TestFileStateTryRestoreError(t *testing.T) {
testCase := struct { testCases := []struct {
description string description string
wantPanic bool expErr error
panicMessage string
}{ }{
"Panic creating file", {
true, " create file error",
"[cpumanager] state file not written", fmt.Errorf("[cpumanager] state file not written"),
},
} }
t.Run(testCase.description, func(t *testing.T) { for _, testCase := range testCases {
sfilePath := path.Join("/invalid_path/to_some_dir", "cpumanager_state_file_test") t.Run(testCase.description, func(t *testing.T) {
defer func() { sfilePath := path.Join("/invalid_path/to_some_dir", "cpumanager_state_file_test")
if err := recover(); err != nil { _, err := NewFileState(sfilePath, "static", nil)
if testCase.wantPanic { if !reflect.DeepEqual(err, testCase.expErr) {
if testCase.panicMessage == err { t.Errorf("unexpected error, expected: %s, got: %s", testCase.expErr, err)
t.Logf("tryRestoreState() got expected panic = %v", err)
return
}
t.Errorf("tryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage)
}
} }
}() })
NewFileState(sfilePath, "static", nil) }
})
} }
func TestUpdateStateFile(t *testing.T) { func TestUpdateStateFile(t *testing.T) {
@ -417,7 +391,11 @@ func TestUpdateStateFile(t *testing.T) {
return return
} }
} }
newFileState := NewFileState(sfilePath.Name(), "static", nil) newFileState, err := NewFileState(sfilePath.Name(), "static", nil)
if err != nil {
t.Errorf("NewFileState() error: %v", err)
return
}
AssertStateEqual(t, newFileState, tc.expectedState) AssertStateEqual(t, newFileState, tc.expectedState)
}) })
} }
@ -476,7 +454,12 @@ func TestHelpersStateFile(t *testing.T) {
t.Errorf("cannot create temporary test file: %q", err.Error()) t.Errorf("cannot create temporary test file: %q", err.Error())
} }
state := NewFileState(sfFile.Name(), "static", nil) state, err := NewFileState(sfFile.Name(), "static", nil)
if err != nil {
t.Errorf("new file state error: %v", err)
return
}
state.SetDefaultCPUSet(tc.defaultCPUset) state.SetDefaultCPUSet(tc.defaultCPUset)
for podUID := range tc.assignments { for podUID := range tc.assignments {
@ -523,7 +506,11 @@ func TestClearStateStateFile(t *testing.T) {
t.Errorf("cannot create temporary test file: %q", err.Error()) t.Errorf("cannot create temporary test file: %q", err.Error())
} }
state := NewFileState(sfFile.Name(), "static", nil) state, err := NewFileState(sfFile.Name(), "static", nil)
if err != nil {
t.Errorf("new file state error: %v", err)
return
}
state.SetDefaultCPUSet(testCase.defaultCPUset) state.SetDefaultCPUSet(testCase.defaultCPUset)
for podUID := range testCase.assignments { for podUID := range testCase.assignments {
for containerName, containerCPUs := range testCase.assignments[podUID] { for containerName, containerCPUs := range testCase.assignments[podUID] {