diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index b52fe59a12d..ed185bc2cc5 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -137,9 +137,10 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo policy = NewNonePolicy() } - stateImpl := state.NewFileState( - path.Join(stateFileDirecory, CPUManagerStateFileName), - policy.Name()) + stateImpl, err := state.NewCheckpointState(stateFileDirecory, policy.Name()) + if err != nil { + return nil, fmt.Errorf("could not initialize checkpoint manager: %v", err) + } manager := &manager{ policy: policy, diff --git a/pkg/kubelet/cm/cpumanager/state/checkpoint.go b/pkg/kubelet/cm/cpumanager/state/checkpoint.go new file mode 100644 index 00000000000..e1a86b1b021 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/checkpoint.go @@ -0,0 +1,72 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "encoding/json" + "hash/fnv" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + hashutil "k8s.io/kubernetes/pkg/util/hash" +) + +var _ checkpointmanager.Checkpoint = &cpuManagerCheckpoint {} + +// cpuManagerCheckpoint struct is used to store cpu/pod assignments in a checkpoint +type cpuManagerCheckpoint struct { + PolicyName string + DefaultCPUSet string + Entries map[string]string + Checksum uint64 +} + +// NewCPUManagerCheckpoint returns an instance of Checkpoint +func NewCPUManagerCheckpoint() *cpuManagerCheckpoint { + return &cpuManagerCheckpoint{Entries: make(map[string]string)} +} + +// MarshalCheckpoint returns marshalled checkpoint +func (cp *cpuManagerCheckpoint) MarshalCheckpoint() ([]byte, error) { + return json.Marshal(*cp) +} + +// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint +func (cp *cpuManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error { + if err := json.Unmarshal(blob, cp); err != nil { + return err + } + if cp.Checksum != cp.GetChecksum() { + return errors.ErrCorruptCheckpoint + } + return nil +} + +// GetChecksum returns calculated checksum of checkpoint +func (cp *cpuManagerCheckpoint) GetChecksum() uint64 { + orig := cp.Checksum + cp.Checksum = 0 + hash := fnv.New32a() + hashutil.DeepHashObject(hash, *cp) + cp.Checksum = orig + return uint64(hash.Sum32()) +} + +// UpdateChecksum calculates and updates checksum of the checkpoint +func (cp *cpuManagerCheckpoint) UpdateChecksum() { + cp.Checksum = cp.GetChecksum() +} diff --git a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go new file mode 100644 index 00000000000..0656fa9954f --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go @@ -0,0 +1,185 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "fmt" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "path" + "sync" +) + +// cpuManagerCheckpointName is the name of checkpoint file +const cpuManagerCheckpointName = "cpu_manager_state" + +var _ State = &stateCheckpoint{} + +type stateCheckpoint struct { + mux sync.RWMutex + policyName string + cache State + checkpointManager checkpointmanager.CheckpointManager +} + +// NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend +func NewCheckpointState(stateDir string, policyName string) (State, error) { + checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + if err != nil { + return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) + } + stateCheckpoint := &stateCheckpoint{ + cache: NewMemoryState(), + policyName: policyName, + checkpointManager: checkpointManager, + } + + if err := stateCheckpoint.tryRestoreState(); err != nil { + return nil, fmt.Errorf("unable to restore state from checkpoint: %v\n"+ + "Please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet.", + err, path.Join(stateDir, cpuManagerCheckpointName)) + } + + return stateCheckpoint, nil +} + +// tryRestoreState tries to read checkpoint and creates it if it doesn't exist +func (sc *stateCheckpoint) tryRestoreState() error { + sc.mux.Lock() + defer sc.mux.Unlock() + var err error + + // used when all parsing is ok + tmpAssignments := make(ContainerCPUAssignments) + tmpDefaultCPUSet := cpuset.NewCPUSet() + tmpContainerCPUSet := cpuset.NewCPUSet() + + checkpoint := NewCPUManagerCheckpoint() + if err = sc.checkpointManager.GetCheckpoint(cpuManagerCheckpointName, checkpoint); err != nil { + if err == errors.ErrCheckpointNotFound { + sc.storeState() + return nil + } + return err + } + + if sc.policyName != checkpoint.PolicyName { + return fmt.Errorf("policy configured %q != policy from state checkpoint %q", sc.policyName, checkpoint.PolicyName) + } + + if tmpDefaultCPUSet, err = cpuset.Parse(checkpoint.DefaultCPUSet); err != nil { + return fmt.Errorf("could not parse state checkpoint - [defaultCpuSet: %q]", checkpoint.DefaultCPUSet) + } + + for containerID, cpuString := range checkpoint.Entries { + if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { + return fmt.Errorf("could not parse state checkpoint - container id: %q, cpuset: %q", containerID, cpuString) + } + tmpAssignments[containerID] = tmpContainerCPUSet + } + + sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet) + sc.cache.SetCPUAssignments(tmpAssignments) + + glog.V(2).Info("[cpumanager] state checkpoint: restored state from checkpoint") + glog.V(2).Infof("[cpumanager] state checkpoint: defaultCPUSet: %s", tmpDefaultCPUSet.String()) + + return nil +} + +// saves state to a checkpoint, caller is responsible for locking +func (sc *stateCheckpoint) storeState() { + checkpoint := NewCPUManagerCheckpoint() + checkpoint.PolicyName = sc.policyName + checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String() + + for containerID, cset := range sc.cache.GetCPUAssignments() { + checkpoint.Entries[containerID] = cset.String() + } + + err := sc.checkpointManager.CreateCheckpoint(cpuManagerCheckpointName, checkpoint) + + if err != nil { + panic("[cpumanager] could not save checkpoint: " + err.Error()) + } +} + +func (sc *stateCheckpoint) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { + sc.mux.RLock() + defer sc.mux.RUnlock() + + res, ok := sc.cache.GetCPUSet(containerID) + return res, ok +} + +func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet { + sc.mux.RLock() + defer sc.mux.RUnlock() + + return sc.cache.GetDefaultCPUSet() +} + +func (sc *stateCheckpoint) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { + sc.mux.RLock() + defer sc.mux.RUnlock() + + return sc.cache.GetCPUSetOrDefault(containerID) +} + +func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments { + sc.mux.RLock() + defer sc.mux.RUnlock() + + return sc.cache.GetCPUAssignments() +} + +func (sc *stateCheckpoint) SetCPUSet(containerID string, cset cpuset.CPUSet) { + sc.mux.Lock() + defer sc.mux.Unlock() + sc.cache.SetCPUSet(containerID, cset) + sc.storeState() +} + +func (sc *stateCheckpoint) SetDefaultCPUSet(cset cpuset.CPUSet) { + sc.mux.Lock() + defer sc.mux.Unlock() + sc.cache.SetDefaultCPUSet(cset) + sc.storeState() +} + +func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) { + sc.mux.Lock() + defer sc.mux.Unlock() + sc.cache.SetCPUAssignments(a) + sc.storeState() +} + +func (sc *stateCheckpoint) Delete(containerID string) { + sc.mux.Lock() + defer sc.mux.Unlock() + sc.cache.Delete(containerID) + sc.storeState() +} + +func (sc *stateCheckpoint) ClearState() { + sc.mux.Lock() + defer sc.mux.Unlock() + sc.cache.ClearState() + sc.storeState() +}