diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index fae628c98f1..1ab11d3b3b9 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -462,6 +462,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { CgroupsPerQOS: s.CgroupsPerQOS, CgroupRoot: s.CgroupRoot, CgroupDriver: s.CgroupDriver, + KubeletRootDir: s.RootDirectory, ProtectKernelDefaults: s.ProtectKernelDefaults, NodeAllocatableConfig: cm.NodeAllocatableConfig{ KubeReservedCgroupName: s.KubeReservedCgroup, diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index ce19634d3f2..6c772e27b04 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -96,6 +96,7 @@ type NodeConfig struct { CgroupsPerQOS bool CgroupRoot string CgroupDriver string + KubeletRootDir string ProtectKernelDefaults bool NodeAllocatableConfig ExperimentalQOSReserved map[v1.ResourceName]int64 diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 22c4dbf1597..a1f8f054e10 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -291,6 +291,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I nodeConfig.ExperimentalCPUManagerReconcilePeriod, machineInfo, cm.GetNodeAllocatableReservation(), + nodeConfig.KubeletRootDir, ) if err != nil { glog.Errorf("failed to initialize cpu manager: %v", err) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index b91fdfa3ef0..d8ee14930a0 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/status" + "path" ) // ActivePodsFunc is a function that returns a list of pods to reconcile. @@ -44,6 +45,9 @@ type runtimeService interface { type policyName string +// CPUManagerStateFileName is the name file name where cpu manager stores it's state +const CPUManagerStateFileName = "cpu_manager_state" + // Manager interface provides methods for Kubelet to manage pod cpus. type Manager interface { // Start is called during Kubelet initialization. @@ -99,13 +103,16 @@ func NewManager( reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, + stateFileDirecory string, ) (Manager, error) { var policy Policy + var stateHandle state.State switch policyName(cpuPolicyName) { case PolicyNone: policy = NewNonePolicy() + stateHandle = state.NewMemoryState() case PolicyStatic: topo, err := topology.Discover(machineInfo) @@ -134,16 +141,18 @@ func NewManager( reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) policy = NewStaticPolicy(topo, numReservedCPUs) + stateHandle = state.NewFileState(path.Join(stateFileDirecory, CPUManagerStateFileName), policy.Name()) default: glog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone) policy = NewNonePolicy() + stateHandle = state.NewMemoryState() } manager := &manager{ policy: policy, reconcilePeriod: reconcilePeriod, - state: state.NewMemoryState(), + state: stateHandle, machineInfo: machineInfo, nodeAllocatableReservation: nodeAllocatableReservation, } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 1eca50b91f4..35c925f6fc2 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -109,9 +109,45 @@ func (p *staticPolicy) Name() string { } func (p *staticPolicy) Start(s state.State) { - // Configure the shared pool to include all detected CPU IDs. - allCPUs := p.topology.CPUDetails.CPUs() - s.SetDefaultCPUSet(allCPUs) + if err := p.validateState(s); err != nil { + glog.Errorf("[cpumanager] static policy invalid state: %s\n", err.Error()) + panic("[cpumanager] - please drain node and remove policy state file") + } +} + +func (p *staticPolicy) validateState(s state.State) error { + tmpAssignments := s.GetCPUAssignments() + tmpDefaultCPUset := s.GetDefaultCPUSet() + + // Default cpuset cannot be empty when assignments exist + if tmpDefaultCPUset.IsEmpty() { + if len(tmpAssignments) != 0 { + return fmt.Errorf("default cpuset cannot be empty") + } + // state is empty initialize + allCPUs := p.topology.CPUDetails.CPUs() + s.SetDefaultCPUSet(allCPUs) + return nil + } + + // State has already been initialized from file (is not empty) + // 1 Check if the reserved cpuset is not part of default cpuset because: + // - kube/system reserved have changed (increased) - may lead to some containers not being able to start + // - user tampered with file + if !p.reserved.Intersection(tmpDefaultCPUset).Equals(p.reserved) { + return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"", + p.reserved.String(), tmpDefaultCPUset.String()) + } + + // 2. Check if state for static policy is consistent + for cID, cset := range tmpAssignments { + // None of the cpu in DEFAULT cset should be in s.assignments + if !tmpDefaultCPUset.Intersection(cset).IsEmpty() { + return fmt.Errorf("container id: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"", + cID, cset.String(), tmpDefaultCPUset.String()) + } + } + return nil } // assignableCPUs returns the set of unassigned CPUs minus the reserved set. diff --git a/pkg/kubelet/cm/cpumanager/state/state_file.go b/pkg/kubelet/cm/cpumanager/state/state_file.go index b800613419f..b9c54f4bce1 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file.go @@ -18,6 +18,7 @@ package state import ( "encoding/json" + "fmt" "github.com/golang/glog" "io/ioutil" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" @@ -26,6 +27,7 @@ import ( ) type stateFileData struct { + PolicyName string `json:"policyName"` DefaultCPUSet string `json:"defaultCpuSet"` Entries map[string]string `json:"entries,omitempty"` } @@ -35,19 +37,21 @@ var _ State = &stateFile{} type stateFile struct { sync.RWMutex stateFilePath string + policyName string cache State } // NewFileState creates new State for keeping track of cpu/pod assignment with file backend -func NewFileState(filePath string) State { +func NewFileState(filePath string, policyName string) State { stateFile := &stateFile{ stateFilePath: filePath, cache: NewMemoryState(), + policyName: policyName, } if err := stateFile.tryRestoreState(); err != nil { // could not restore state, init new state file - glog.Infof("[cpumanager] state file: initializing empty state file") + glog.Infof("[cpumanager] state file: initializing empty state file - reason: \"%s\"") stateFile.cache.ClearState() stateFile.storeState() } @@ -85,6 +89,10 @@ func (sf *stateFile) tryRestoreState() error { return err } + if sf.policyName != readState.PolicyName { + return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName) + } + if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) return err @@ -113,6 +121,7 @@ func (sf *stateFile) storeState() { var err error data := stateFileData{ + PolicyName: sf.policyName, DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(), Entries: map[string]string{}, }