Migrate cpumanager to use checkpointing manager

This commit is contained in:
Klaudiusz Dembler 2018-02-01 19:35:14 +01:00 committed by Klaudiusz Dembler
parent 0989967374
commit 6bfceed4ab
No known key found for this signature in database
GPG Key ID: 14B9FB649EE34C35
3 changed files with 261 additions and 3 deletions

View File

@ -137,9 +137,10 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
policy = NewNonePolicy()
}
stateImpl := state.NewFileState(
path.Join(stateFileDirecory, CPUManagerStateFileName),
policy.Name())
stateImpl, err := state.NewCheckpointState(stateFileDirecory, policy.Name())
if err != nil {
return nil, fmt.Errorf("could not initialize checkpoint manager: %v", err)
}
manager := &manager{
policy: policy,

View File

@ -0,0 +1,72 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"encoding/json"
"hash/fnv"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
var _ checkpointmanager.Checkpoint = &cpuManagerCheckpoint {}
// cpuManagerCheckpoint struct is used to store cpu/pod assignments in a checkpoint
type cpuManagerCheckpoint struct {
PolicyName string
DefaultCPUSet string
Entries map[string]string
Checksum uint64
}
// NewCPUManagerCheckpoint returns an instance of Checkpoint
func NewCPUManagerCheckpoint() *cpuManagerCheckpoint {
return &cpuManagerCheckpoint{Entries: make(map[string]string)}
}
// MarshalCheckpoint returns marshalled checkpoint
func (cp *cpuManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
return json.Marshal(*cp)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
func (cp *cpuManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
if err := json.Unmarshal(blob, cp); err != nil {
return err
}
if cp.Checksum != cp.GetChecksum() {
return errors.ErrCorruptCheckpoint
}
return nil
}
// GetChecksum returns calculated checksum of checkpoint
func (cp *cpuManagerCheckpoint) GetChecksum() uint64 {
orig := cp.Checksum
cp.Checksum = 0
hash := fnv.New32a()
hashutil.DeepHashObject(hash, *cp)
cp.Checksum = orig
return uint64(hash.Sum32())
}
// UpdateChecksum calculates and updates checksum of the checkpoint
func (cp *cpuManagerCheckpoint) UpdateChecksum() {
cp.Checksum = cp.GetChecksum()
}

View File

@ -0,0 +1,185 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"path"
"sync"
)
// cpuManagerCheckpointName is the name of checkpoint file
const cpuManagerCheckpointName = "cpu_manager_state"
var _ State = &stateCheckpoint{}
type stateCheckpoint struct {
mux sync.RWMutex
policyName string
cache State
checkpointManager checkpointmanager.CheckpointManager
}
// NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend
func NewCheckpointState(stateDir string, policyName string) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
stateCheckpoint := &stateCheckpoint{
cache: NewMemoryState(),
policyName: policyName,
checkpointManager: checkpointManager,
}
if err := stateCheckpoint.tryRestoreState(); err != nil {
return nil, fmt.Errorf("unable to restore state from checkpoint: %v\n"+
"Please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet.",
err, path.Join(stateDir, cpuManagerCheckpointName))
}
return stateCheckpoint, nil
}
// tryRestoreState tries to read checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) tryRestoreState() error {
sc.mux.Lock()
defer sc.mux.Unlock()
var err error
// used when all parsing is ok
tmpAssignments := make(ContainerCPUAssignments)
tmpDefaultCPUSet := cpuset.NewCPUSet()
tmpContainerCPUSet := cpuset.NewCPUSet()
checkpoint := NewCPUManagerCheckpoint()
if err = sc.checkpointManager.GetCheckpoint(cpuManagerCheckpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound {
sc.storeState()
return nil
}
return err
}
if sc.policyName != checkpoint.PolicyName {
return fmt.Errorf("policy configured %q != policy from state checkpoint %q", sc.policyName, checkpoint.PolicyName)
}
if tmpDefaultCPUSet, err = cpuset.Parse(checkpoint.DefaultCPUSet); err != nil {
return fmt.Errorf("could not parse state checkpoint - [defaultCpuSet: %q]", checkpoint.DefaultCPUSet)
}
for containerID, cpuString := range checkpoint.Entries {
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
return fmt.Errorf("could not parse state checkpoint - container id: %q, cpuset: %q", containerID, cpuString)
}
tmpAssignments[containerID] = tmpContainerCPUSet
}
sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
sc.cache.SetCPUAssignments(tmpAssignments)
glog.V(2).Info("[cpumanager] state checkpoint: restored state from checkpoint")
glog.V(2).Infof("[cpumanager] state checkpoint: defaultCPUSet: %s", tmpDefaultCPUSet.String())
return nil
}
// saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) storeState() {
checkpoint := NewCPUManagerCheckpoint()
checkpoint.PolicyName = sc.policyName
checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String()
for containerID, cset := range sc.cache.GetCPUAssignments() {
checkpoint.Entries[containerID] = cset.String()
}
err := sc.checkpointManager.CreateCheckpoint(cpuManagerCheckpointName, checkpoint)
if err != nil {
panic("[cpumanager] could not save checkpoint: " + err.Error())
}
}
func (sc *stateCheckpoint) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
sc.mux.RLock()
defer sc.mux.RUnlock()
res, ok := sc.cache.GetCPUSet(containerID)
return res, ok
}
func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetDefaultCPUSet()
}
func (sc *stateCheckpoint) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetCPUSetOrDefault(containerID)
}
func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetCPUAssignments()
}
func (sc *stateCheckpoint) SetCPUSet(containerID string, cset cpuset.CPUSet) {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetCPUSet(containerID, cset)
sc.storeState()
}
func (sc *stateCheckpoint) SetDefaultCPUSet(cset cpuset.CPUSet) {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetDefaultCPUSet(cset)
sc.storeState()
}
func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetCPUAssignments(a)
sc.storeState()
}
func (sc *stateCheckpoint) Delete(containerID string) {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.Delete(containerID)
sc.storeState()
}
func (sc *stateCheckpoint) ClearState() {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.ClearState()
sc.storeState()
}