diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 31fbde9337a..7ea022baf0b 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -461,6 +461,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { CgroupsPerQOS: s.CgroupsPerQOS, CgroupRoot: s.CgroupRoot, CgroupDriver: s.CgroupDriver, + KubeletRootDir: s.RootDirectory, ProtectKernelDefaults: s.ProtectKernelDefaults, NodeAllocatableConfig: cm.NodeAllocatableConfig{ KubeReservedCgroupName: s.KubeReservedCgroup, diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index ce19634d3f2..6c772e27b04 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -96,6 +96,7 @@ type NodeConfig struct { CgroupsPerQOS bool CgroupRoot string CgroupDriver string + KubeletRootDir string ProtectKernelDefaults bool NodeAllocatableConfig ExperimentalQOSReserved map[v1.ResourceName]int64 diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 22c4dbf1597..a1f8f054e10 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -291,6 +291,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I nodeConfig.ExperimentalCPUManagerReconcilePeriod, machineInfo, cm.GetNodeAllocatableReservation(), + nodeConfig.KubeletRootDir, ) if err != nil { glog.Errorf("failed to initialize cpu manager: %v", err) diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index 370217f3db5..f76a6a9312e 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -43,6 +43,7 @@ go_test( "//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", + "//vendor/github.com/google/cadvisor/info/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index b91fdfa3ef0..d8ee14930a0 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/status" + "path" ) // ActivePodsFunc is a function that returns a list of pods to reconcile. @@ -44,6 +45,9 @@ type runtimeService interface { type policyName string +// CPUManagerStateFileName is the name file name where cpu manager stores it's state +const CPUManagerStateFileName = "cpu_manager_state" + // Manager interface provides methods for Kubelet to manage pod cpus. type Manager interface { // Start is called during Kubelet initialization. @@ -99,13 +103,16 @@ func NewManager( reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, + stateFileDirecory string, ) (Manager, error) { var policy Policy + var stateHandle state.State switch policyName(cpuPolicyName) { case PolicyNone: policy = NewNonePolicy() + stateHandle = state.NewMemoryState() case PolicyStatic: topo, err := topology.Discover(machineInfo) @@ -134,16 +141,18 @@ func NewManager( reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) policy = NewStaticPolicy(topo, numReservedCPUs) + stateHandle = state.NewFileState(path.Join(stateFileDirecory, CPUManagerStateFileName), policy.Name()) default: glog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone) policy = NewNonePolicy() + stateHandle = state.NewMemoryState() } manager := &manager{ policy: policy, reconcilePeriod: reconcilePeriod, - state: state.NewMemoryState(), + state: stateHandle, machineInfo: machineInfo, nodeAllocatableReservation: nodeAllocatableReservation, } diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index e4df638c3db..2a3b1201a41 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -19,8 +19,12 @@ package cpumanager import ( "fmt" "reflect" + "strings" "testing" + "time" + cadvisorapi "github.com/google/cadvisor/info/v1" + "io/ioutil" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,6 +33,7 @@ import ( runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "os" ) type mockState struct { @@ -223,6 +228,127 @@ func TestCPUManagerAdd(t *testing.T) { } } +func TestCPUManagerGenerate(t *testing.T) { + testCases := []struct { + description string + cpuPolicyName string + nodeAllocatableReservation v1.ResourceList + isTopologyBroken bool + panicMsg string + expectedPolicy string + expectedError error + skipIfPermissionsError bool + }{ + { + description: "set none policy", + cpuPolicyName: "none", + nodeAllocatableReservation: nil, + expectedPolicy: "none", + }, + { + description: "invalid policy name", + cpuPolicyName: "invalid", + nodeAllocatableReservation: nil, + expectedPolicy: "none", + }, + { + description: "static policy", + cpuPolicyName: "static", + nodeAllocatableReservation: v1.ResourceList{v1.ResourceCPU: *resource.NewQuantity(3, resource.DecimalSI)}, + expectedPolicy: "static", + skipIfPermissionsError: true, + }, + { + description: "static policy - broken topology", + cpuPolicyName: "static", + nodeAllocatableReservation: v1.ResourceList{}, + isTopologyBroken: true, + expectedError: fmt.Errorf("could not detect number of cpus"), + skipIfPermissionsError: true, + }, + { + description: "static policy - broken reservation", + cpuPolicyName: "static", + nodeAllocatableReservation: v1.ResourceList{}, + panicMsg: "unable to determine reserved CPU resources for static policy", + skipIfPermissionsError: true, + }, + { + description: "static policy - no CPU resources", + cpuPolicyName: "static", + nodeAllocatableReservation: v1.ResourceList{v1.ResourceCPU: *resource.NewQuantity(0, resource.DecimalSI)}, + panicMsg: "the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero", + skipIfPermissionsError: true, + }, + } + + mockedMachineInfo := cadvisorapi.MachineInfo{ + NumCores: 4, + Topology: []cadvisorapi.Node{ + { + Cores: []cadvisorapi.Core{ + { + Id: 0, + Threads: []int{0}, + }, + { + Id: 1, + Threads: []int{1}, + }, + { + Id: 2, + Threads: []int{2}, + }, + { + Id: 3, + Threads: []int{3}, + }, + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + machineInfo := &mockedMachineInfo + if testCase.isTopologyBroken { + machineInfo = &cadvisorapi.MachineInfo{} + } + sDir, err := ioutil.TempDir("/tmp/", "cpu_manager_test") + if err != nil { + t.Errorf("cannot create state file: %s", err.Error()) + } + defer os.RemoveAll(sDir) + defer func() { + if err := recover(); err != nil { + if testCase.panicMsg != "" { + if !strings.Contains(err.(string), testCase.panicMsg) { + t.Errorf("Unexpected panic message. Have: %q wants %q", err, testCase.panicMsg) + } + } else { + t.Errorf("Unexpected panic: %q", err) + } + } else if testCase.panicMsg != "" { + t.Error("Expected panic hasn't been raised") + } + }() + + mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir) + if testCase.expectedError != nil { + if !strings.Contains(err.Error(), testCase.expectedError.Error()) { + t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error()) + } + } else { + rawMgr := mgr.(*manager) + if rawMgr.policy.Name() != testCase.expectedPolicy { + t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), testCase.expectedPolicy) + } + } + }) + + } +} + func TestCPUManagerRemove(t *testing.T) { mgr := &manager{ policy: &mockPolicy{ diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 8f1403a6d2f..dfbb0a297d0 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -109,9 +109,45 @@ func (p *staticPolicy) Name() string { } func (p *staticPolicy) Start(s state.State) { - // Configure the shared pool to include all detected CPU IDs. - allCPUs := p.topology.CPUDetails.CPUs() - s.SetDefaultCPUSet(allCPUs) + if err := p.validateState(s); err != nil { + glog.Errorf("[cpumanager] static policy invalid state: %s\n", err.Error()) + panic("[cpumanager] - please drain node and remove policy state file") + } +} + +func (p *staticPolicy) validateState(s state.State) error { + tmpAssignments := s.GetCPUAssignments() + tmpDefaultCPUset := s.GetDefaultCPUSet() + + // Default cpuset cannot be empty when assignments exist + if tmpDefaultCPUset.IsEmpty() { + if len(tmpAssignments) != 0 { + return fmt.Errorf("default cpuset cannot be empty") + } + // state is empty initialize + allCPUs := p.topology.CPUDetails.CPUs() + s.SetDefaultCPUSet(allCPUs) + return nil + } + + // State has already been initialized from file (is not empty) + // 1 Check if the reserved cpuset is not part of default cpuset because: + // - kube/system reserved have changed (increased) - may lead to some containers not being able to start + // - user tampered with file + if !p.reserved.Intersection(tmpDefaultCPUset).Equals(p.reserved) { + return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"", + p.reserved.String(), tmpDefaultCPUset.String()) + } + + // 2. Check if state for static policy is consistent + for cID, cset := range tmpAssignments { + // None of the cpu in DEFAULT cset should be in s.assignments + if !tmpDefaultCPUset.Intersection(cset).IsEmpty() { + return fmt.Errorf("container id: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"", + cID, cset.String(), tmpDefaultCPUset.String()) + } + } + return nil } // assignableCPUs returns the set of unassigned CPUs minus the reserved set. diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index e6626f44675..f0e592ffd11 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -38,6 +38,7 @@ type staticPolicyTest struct { expErr error expCPUAlloc bool expCSet cpuset.CPUSet + expPanic bool } func TestStaticPolicyName(t *testing.T) { @@ -51,18 +52,73 @@ func TestStaticPolicyName(t *testing.T) { } func TestStaticPolicyStart(t *testing.T) { - policy := NewStaticPolicy(topoSingleSocketHT, 1).(*staticPolicy) - - st := &mockState{ - assignments: state.ContainerCPUAssignments{}, - defaultCPUSet: cpuset.NewCPUSet(), + testCases := []staticPolicyTest{ + { + description: "non-corrupted state", + topo: topoDualSocketHT, + stAssignments: state.ContainerCPUAssignments{ + "0": cpuset.NewCPUSet(0), + }, + stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + expCSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + }, + { + description: "empty cpuset", + topo: topoDualSocketHT, + numReservedCPUs: 1, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(), + expCSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + }, + { + description: "reserved cores 0 & 6 are not present in available cpuset", + topo: topoDualSocketHT, + numReservedCPUs: 2, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1), + expPanic: true, + }, + { + description: "assigned core 2 is still present in available cpuset", + topo: topoDualSocketHT, + stAssignments: state.ContainerCPUAssignments{ + "0": cpuset.NewCPUSet(0, 1, 2), + }, + stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + expPanic: true, + }, } + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + defer func() { + if err := recover(); err != nil { + if !testCase.expPanic { + t.Errorf("unexpected panic occured: %q", err) + } + } else if testCase.expPanic { + t.Error("expected panic doesn't occured") + } + }() + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs).(*staticPolicy) + st := &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + } + policy.Start(st) - policy.Start(st) - for cpuid := 1; cpuid < policy.topology.NumCPUs; cpuid++ { - if !st.defaultCPUSet.Contains(cpuid) { - t.Errorf("StaticPolicy Start() error. expected cpuid %d to be present in defaultCPUSet", cpuid) - } + if !testCase.stDefaultCPUSet.IsEmpty() { + for cpuid := 1; cpuid < policy.topology.NumCPUs; cpuid++ { + if !st.defaultCPUSet.Contains(cpuid) { + t.Errorf("StaticPolicy Start() error. expected cpuid %d to be present in defaultCPUSet", cpuid) + } + } + } + if !st.GetDefaultCPUSet().Equals(testCase.expCSet) { + t.Errorf("State CPUSet is different than expected. Have %q wants: %q", st.GetDefaultCPUSet(), + testCase.expCSet) + } + + }) } } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file.go b/pkg/kubelet/cm/cpumanager/state/state_file.go index b800613419f..20c7763350d 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file.go @@ -18,6 +18,7 @@ package state import ( "encoding/json" + "fmt" "github.com/golang/glog" "io/ioutil" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" @@ -26,6 +27,7 @@ import ( ) type stateFileData struct { + PolicyName string `json:"policyName"` DefaultCPUSet string `json:"defaultCpuSet"` Entries map[string]string `json:"entries,omitempty"` } @@ -35,19 +37,21 @@ var _ State = &stateFile{} type stateFile struct { sync.RWMutex stateFilePath string + policyName string cache State } // NewFileState creates new State for keeping track of cpu/pod assignment with file backend -func NewFileState(filePath string) State { +func NewFileState(filePath string, policyName string) State { stateFile := &stateFile{ stateFilePath: filePath, cache: NewMemoryState(), + policyName: policyName, } if err := stateFile.tryRestoreState(); err != nil { // could not restore state, init new state file - glog.Infof("[cpumanager] state file: initializing empty state file") + glog.Infof("[cpumanager] state file: initializing empty state file - reason: \"%s\"", err) stateFile.cache.ClearState() stateFile.storeState() } @@ -85,6 +89,10 @@ func (sf *stateFile) tryRestoreState() error { return err } + if sf.policyName != readState.PolicyName { + return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName) + } + if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) return err @@ -113,6 +121,7 @@ func (sf *stateFile) storeState() { var err error data := stateFileData{ + PolicyName: sf.policyName, DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(), Entries: map[string]string{}, } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file_test.go b/pkg/kubelet/cm/cpumanager/state/state_file_test.go index c04860ac3a9..cc653f9d9e2 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file_test.go @@ -98,7 +98,7 @@ func TestFileStateTryRestore(t *testing.T) { }, { "Try restore defaultCPUSet only", - "{ \"defaultCpuSet\": \"4-6\"}", + `{"policyName": "none", "defaultCpuSet": "4-6"}`, "", &stateMemory{ assignments: ContainerCPUAssignments{}, @@ -107,7 +107,7 @@ func TestFileStateTryRestore(t *testing.T) { }, { "Try restore defaultCPUSet only - invalid name", - "{ \"defCPUSet\": \"4-6\"}", + `{"policyName": "none", "defaultCpuSet" "4-6"}`, "", &stateMemory{ assignments: ContainerCPUAssignments{}, @@ -116,11 +116,13 @@ func TestFileStateTryRestore(t *testing.T) { }, { "Try restore assignments only", - "{" + - "\"entries\": { " + - "\"container1\": \"4-6\"," + - "\"container2\": \"1-3\"" + - "} }", + `{ + "policyName": "none", + "entries": { + "container1": "4-6", + "container2": "1-3" + } + }`, "", &stateMemory{ assignments: ContainerCPUAssignments{ @@ -132,7 +134,7 @@ func TestFileStateTryRestore(t *testing.T) { }, { "Try restore invalid assignments", - "{ \"entries\": }", + `{"entries": }`, "state file: could not unmarshal, corrupted state file", &stateMemory{ assignments: ContainerCPUAssignments{}, @@ -141,12 +143,14 @@ func TestFileStateTryRestore(t *testing.T) { }, { "Try restore valid file", - "{ " + - "\"defaultCpuSet\": \"23-24\", " + - "\"entries\": { " + - "\"container1\": \"4-6\", " + - "\"container2\": \"1-3\"" + - " } }", + `{ + "policyName": "none", + "defaultCpuSet": "23-24", + "entries": { + "container1": "4-6", + "container2": "1-3" + } + }`, "", &stateMemory{ assignments: ContainerCPUAssignments{ @@ -158,7 +162,10 @@ func TestFileStateTryRestore(t *testing.T) { }, { "Try restore un-parsable defaultCPUSet ", - "{ \"defaultCpuSet\": \"2-sd\" }", + `{ + "policyName": "none", + "defaultCpuSet": "2-sd" + }`, "state file: could not parse state file", &stateMemory{ assignments: ContainerCPUAssignments{}, @@ -167,12 +174,14 @@ func TestFileStateTryRestore(t *testing.T) { }, { "Try restore un-parsable assignments", - "{ " + - "\"defaultCpuSet\": \"23-24\", " + - "\"entries\": { " + - "\"container1\": \"p-6\", " + - "\"container2\": \"1-3\"" + - " } }", + `{ + "policyName": "none", + "defaultCpuSet": "23-24", + "entries": { + "container1": "p-6", + "container2": "1-3" + } + }`, "state file: could not parse state file", &stateMemory{ assignments: ContainerCPUAssignments{}, @@ -205,7 +214,7 @@ func TestFileStateTryRestore(t *testing.T) { defer os.Remove(sfilePath.Name()) logData, fileState := stderrCapture(t, func() State { - return NewFileState(sfilePath.Name()) + return NewFileState(sfilePath.Name(), "none") }) if tc.expErr != "" { @@ -250,7 +259,7 @@ func TestFileStateTryRestorePanic(t *testing.T) { } } }() - NewFileState(sfilePath) + NewFileState(sfilePath, "static") }) } @@ -302,6 +311,7 @@ func TestUpdateStateFile(t *testing.T) { } fileState := stateFile{ stateFilePath: sfilePath.Name(), + policyName: "static", cache: NewMemoryState(), } @@ -331,7 +341,7 @@ func TestUpdateStateFile(t *testing.T) { return } } - newFileState := NewFileState(sfilePath.Name()) + newFileState := NewFileState(sfilePath.Name(), "static") stateEqual(t, newFileState, tc.expectedState) }) } @@ -382,7 +392,7 @@ func TestHelpersStateFile(t *testing.T) { t.Errorf("cannot create temporary test file: %q", err.Error()) } - state := NewFileState(sfFile.Name()) + state := NewFileState(sfFile.Name(), "static") state.SetDefaultCPUSet(tc.defaultCPUset) for containerName, containerCPUs := range tc.containers { @@ -425,7 +435,7 @@ func TestClearStateStateFile(t *testing.T) { t.Errorf("cannot create temporary test file: %q", err.Error()) } - state := NewFileState(sfFile.Name()) + state := NewFileState(sfFile.Name(), "static") state.SetDefaultCPUSet(testCase.defaultCPUset) for containerName, containerCPUs := range testCase.containers { state.SetCPUSet(containerName, containerCPUs)