Enable file state in static policy

This commit is contained in:
Szymon Scharmach 2017-10-17 14:23:39 +02:00 committed by Szymon Scharmach
parent 210626577b
commit 7e7301ffaf
6 changed files with 63 additions and 6 deletions

View File

@ -462,6 +462,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
CgroupsPerQOS: s.CgroupsPerQOS, CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot, CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver, CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults, ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{ NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup, KubeReservedCgroupName: s.KubeReservedCgroup,

View File

@ -96,6 +96,7 @@ type NodeConfig struct {
CgroupsPerQOS bool CgroupsPerQOS bool
CgroupRoot string CgroupRoot string
CgroupDriver string CgroupDriver string
KubeletRootDir string
ProtectKernelDefaults bool ProtectKernelDefaults bool
NodeAllocatableConfig NodeAllocatableConfig
ExperimentalQOSReserved map[v1.ResourceName]int64 ExperimentalQOSReserved map[v1.ResourceName]int64

View File

@ -291,6 +291,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
nodeConfig.ExperimentalCPUManagerReconcilePeriod, nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo, machineInfo,
cm.GetNodeAllocatableReservation(), cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
) )
if err != nil { if err != nil {
glog.Errorf("failed to initialize cpu manager: %v", err) glog.Errorf("failed to initialize cpu manager: %v", err)

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
"path"
) )
// ActivePodsFunc is a function that returns a list of pods to reconcile. // ActivePodsFunc is a function that returns a list of pods to reconcile.
@ -44,6 +45,9 @@ type runtimeService interface {
type policyName string type policyName string
// CPUManagerStateFileName is the name file name where cpu manager stores it's state
const CPUManagerStateFileName = "cpu_manager_state"
// Manager interface provides methods for Kubelet to manage pod cpus. // Manager interface provides methods for Kubelet to manage pod cpus.
type Manager interface { type Manager interface {
// Start is called during Kubelet initialization. // Start is called during Kubelet initialization.
@ -99,13 +103,16 @@ func NewManager(
reconcilePeriod time.Duration, reconcilePeriod time.Duration,
machineInfo *cadvisorapi.MachineInfo, machineInfo *cadvisorapi.MachineInfo,
nodeAllocatableReservation v1.ResourceList, nodeAllocatableReservation v1.ResourceList,
stateFileDirecory string,
) (Manager, error) { ) (Manager, error) {
var policy Policy var policy Policy
var stateHandle state.State
switch policyName(cpuPolicyName) { switch policyName(cpuPolicyName) {
case PolicyNone: case PolicyNone:
policy = NewNonePolicy() policy = NewNonePolicy()
stateHandle = state.NewMemoryState()
case PolicyStatic: case PolicyStatic:
topo, err := topology.Discover(machineInfo) topo, err := topology.Discover(machineInfo)
@ -134,16 +141,18 @@ func NewManager(
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy = NewStaticPolicy(topo, numReservedCPUs) policy = NewStaticPolicy(topo, numReservedCPUs)
stateHandle = state.NewFileState(path.Join(stateFileDirecory, CPUManagerStateFileName), policy.Name())
default: default:
glog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone) glog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone)
policy = NewNonePolicy() policy = NewNonePolicy()
stateHandle = state.NewMemoryState()
} }
manager := &manager{ manager := &manager{
policy: policy, policy: policy,
reconcilePeriod: reconcilePeriod, reconcilePeriod: reconcilePeriod,
state: state.NewMemoryState(), state: stateHandle,
machineInfo: machineInfo, machineInfo: machineInfo,
nodeAllocatableReservation: nodeAllocatableReservation, nodeAllocatableReservation: nodeAllocatableReservation,
} }

View File

@ -109,9 +109,45 @@ func (p *staticPolicy) Name() string {
} }
func (p *staticPolicy) Start(s state.State) { func (p *staticPolicy) Start(s state.State) {
// Configure the shared pool to include all detected CPU IDs. if err := p.validateState(s); err != nil {
allCPUs := p.topology.CPUDetails.CPUs() glog.Errorf("[cpumanager] static policy invalid state: %s\n", err.Error())
s.SetDefaultCPUSet(allCPUs) panic("[cpumanager] - please drain node and remove policy state file")
}
}
func (p *staticPolicy) validateState(s state.State) error {
tmpAssignments := s.GetCPUAssignments()
tmpDefaultCPUset := s.GetDefaultCPUSet()
// Default cpuset cannot be empty when assignments exist
if tmpDefaultCPUset.IsEmpty() {
if len(tmpAssignments) != 0 {
return fmt.Errorf("default cpuset cannot be empty")
}
// state is empty initialize
allCPUs := p.topology.CPUDetails.CPUs()
s.SetDefaultCPUSet(allCPUs)
return nil
}
// State has already been initialized from file (is not empty)
// 1 Check if the reserved cpuset is not part of default cpuset because:
// - kube/system reserved have changed (increased) - may lead to some containers not being able to start
// - user tampered with file
if !p.reserved.Intersection(tmpDefaultCPUset).Equals(p.reserved) {
return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
p.reserved.String(), tmpDefaultCPUset.String())
}
// 2. Check if state for static policy is consistent
for cID, cset := range tmpAssignments {
// None of the cpu in DEFAULT cset should be in s.assignments
if !tmpDefaultCPUset.Intersection(cset).IsEmpty() {
return fmt.Errorf("container id: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"",
cID, cset.String(), tmpDefaultCPUset.String())
}
}
return nil
} }
// assignableCPUs returns the set of unassigned CPUs minus the reserved set. // assignableCPUs returns the set of unassigned CPUs minus the reserved set.

View File

@ -18,6 +18,7 @@ package state
import ( import (
"encoding/json" "encoding/json"
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"io/ioutil" "io/ioutil"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
@ -26,6 +27,7 @@ import (
) )
type stateFileData struct { type stateFileData struct {
PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"` DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]string `json:"entries,omitempty"` Entries map[string]string `json:"entries,omitempty"`
} }
@ -35,19 +37,21 @@ var _ State = &stateFile{}
type stateFile struct { type stateFile struct {
sync.RWMutex sync.RWMutex
stateFilePath string stateFilePath string
policyName string
cache State cache State
} }
// NewFileState creates new State for keeping track of cpu/pod assignment with file backend // NewFileState creates new State for keeping track of cpu/pod assignment with file backend
func NewFileState(filePath string) State { func NewFileState(filePath string, policyName string) State {
stateFile := &stateFile{ stateFile := &stateFile{
stateFilePath: filePath, stateFilePath: filePath,
cache: NewMemoryState(), cache: NewMemoryState(),
policyName: policyName,
} }
if err := stateFile.tryRestoreState(); err != nil { if err := stateFile.tryRestoreState(); err != nil {
// could not restore state, init new state file // could not restore state, init new state file
glog.Infof("[cpumanager] state file: initializing empty state file") glog.Infof("[cpumanager] state file: initializing empty state file - reason: \"%s\"")
stateFile.cache.ClearState() stateFile.cache.ClearState()
stateFile.storeState() stateFile.storeState()
} }
@ -85,6 +89,10 @@ func (sf *stateFile) tryRestoreState() error {
return err return err
} }
if sf.policyName != readState.PolicyName {
return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName)
}
if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil {
glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet)
return err return err
@ -113,6 +121,7 @@ func (sf *stateFile) storeState() {
var err error var err error
data := stateFileData{ data := stateFileData{
PolicyName: sf.policyName,
DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(), DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(),
Entries: map[string]string{}, Entries: map[string]string{},
} }