From cedbd932550d15a3e649ddb02ac74784ade32f06 Mon Sep 17 00:00:00 2001 From: vikaschoudhary16 Date: Wed, 29 Nov 2017 13:19:39 -0500 Subject: [PATCH] Make 'pod' package to use unified checkpointManager Signed-off-by: vikaschoudhary16 --- hack/.golint_failures | 2 +- pkg/kubelet/BUILD | 1 + pkg/kubelet/checkpoint/BUILD | 6 +- pkg/kubelet/checkpoint/checkpoint.go | 139 ++++++++---------- pkg/kubelet/checkpoint/checkpoint_test.go | 12 +- pkg/kubelet/checkpointmanager/BUILD | 2 +- pkg/kubelet/checkpointmanager/README.md | 3 +- .../checkpointmanager/checkpoint_manager.go | 4 + .../checkpoint_manager_test.go | 7 +- .../checkpointmanager/errors/errors.go | 7 +- pkg/kubelet/checkpointmanager/testing/util.go | 5 + pkg/kubelet/cm/devicemanager/manager_test.go | 4 +- pkg/kubelet/cm/devicemanager/pod_devices.go | 7 +- pkg/kubelet/config/BUILD | 1 + pkg/kubelet/config/config.go | 13 +- pkg/kubelet/dockershim/BUILD | 1 - pkg/kubelet/dockershim/docker_service_test.go | 2 +- pkg/kubelet/kubelet.go | 10 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/mountpod/mount_pod_test.go | 2 +- pkg/kubelet/pod/BUILD | 1 + pkg/kubelet/pod/pod_manager.go | 11 +- pkg/kubelet/pod/pod_manager_test.go | 2 +- pkg/kubelet/pod/testing/BUILD | 2 + pkg/kubelet/pod/testing/fake_mirror_client.go | 36 +++++ pkg/kubelet/prober/common_test.go | 2 +- pkg/kubelet/prober/worker_test.go | 2 +- pkg/kubelet/runonce_test.go | 2 +- pkg/kubelet/status/status_manager_test.go | 2 +- .../desired_state_of_world_populator_test.go | 2 +- .../volumemanager/volume_manager_test.go | 9 +- 31 files changed, 176 insertions(+), 125 deletions(-) diff --git a/hack/.golint_failures b/hack/.golint_failures index 824de2477d9..ef0253f94bb 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -168,6 +168,7 @@ pkg/kubelet/checkpointmanager/checksum pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1 pkg/kubelet/client pkg/kubelet/cm +pkg/kubelet/cm/devicemanager/checkpoint pkg/kubelet/cm/util pkg/kubelet/config pkg/kubelet/configmap @@ -184,7 +185,6 @@ pkg/kubelet/dockershim/network/hostport pkg/kubelet/dockershim/network/hostport/testing pkg/kubelet/dockershim/network/kubenet pkg/kubelet/dockershim/network/testing -pkg/kubelet/dockershim/testing pkg/kubelet/events pkg/kubelet/images pkg/kubelet/kuberuntime diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index b7657b78239..d2b6099c8f9 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -46,6 +46,7 @@ go_library( "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/certificate:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/configmap:go_default_library", diff --git a/pkg/kubelet/checkpoint/BUILD b/pkg/kubelet/checkpoint/BUILD index c04d198192c..0be2f14b41c 100644 --- a/pkg/kubelet/checkpoint/BUILD +++ b/pkg/kubelet/checkpoint/BUILD @@ -7,9 +7,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/apis/core:go_default_library", - "//pkg/volume/util:go_default_library", - "//vendor/github.com/dchest/safefile:go_default_library", - "//vendor/github.com/ghodss/yaml:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", ], @@ -21,6 +20,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/apis/core:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", ], diff --git a/pkg/kubelet/checkpoint/checkpoint.go b/pkg/kubelet/checkpoint/checkpoint.go index d4b30f902d9..e5801312d6a 100644 --- a/pkg/kubelet/checkpoint/checkpoint.go +++ b/pkg/kubelet/checkpoint/checkpoint.go @@ -17,20 +17,15 @@ limitations under the License. package checkpoint import ( + "encoding/json" "fmt" - "io/ioutil" - "os" - "path/filepath" - "strings" - "sync" - "github.com/dchest/safefile" - "github.com/ghodss/yaml" "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) const ( @@ -39,54 +34,44 @@ const ( podPrefix = "Pod" ) -// Manager is the interface used to manage checkpoints -// which involves writing resources to disk to recover -// during restart or failure scenarios. -// https://github.com/kubernetes/community/pull/1241/files -type Manager interface { - // LoadPods will load checkpointed Pods from disk - LoadPods() ([]*v1.Pod, error) - - // WritePod will serialize a Pod to disk - WritePod(pod *v1.Pod) error - - // Deletes the checkpoint of the given pod from disk - DeletePod(pod *v1.Pod) error +type PodCheckpoint interface { + checkpointmanager.Checkpoint + GetPod() *v1.Pod } -var instance Manager -var mutex = &sync.Mutex{} - -// fileCheckPointManager - is a checkpointer that writes contents to disk -// The type information of the resource objects are encoded in the name -type fileCheckPointManager struct { - path string +// Data to be stored as checkpoint +type Data struct { + Pod *v1.Pod + Checksum checksum.Checksum } -// NewCheckpointManager will create a Manager that points to the following path -func NewCheckpointManager(path string) Manager { - // NOTE: This is a precaution; current implementation should not run - // multiple checkpoint managers. - mutex.Lock() - defer mutex.Unlock() - instance = &fileCheckPointManager{path: path} - return instance +// NewPodCheckpoint returns new pod checkpoint +func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint { + return &Data{Pod: pod} } -// GetInstance will return the current Manager, there should be only one. -func GetInstance() Manager { - mutex.Lock() - defer mutex.Unlock() - return instance +// MarshalCheckpoint returns marshalled data +func (cp *Data) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Pod) + return json.Marshal(*cp) } -// loadPod will load Pod Checkpoint yaml file. -func (fcp *fileCheckPointManager) loadPod(file string) (*v1.Pod, error) { - return util.LoadPodFromFile(file) +// UnmarshalCheckpoint returns unmarshalled data +func (cp *Data) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cp *Data) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Pod) +} + +func (cp *Data) GetPod() *v1.Pod { + return cp.Pod } // checkAnnotations will validate the checkpoint annotations exist on the Pod -func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool { +func checkAnnotations(pod *v1.Pod) bool { if podAnnotations := pod.GetAnnotations(); podAnnotations != nil { if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" { return true @@ -95,57 +80,49 @@ func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool { return false } -// getPodPath returns the full qualified path for the pod checkpoint -func (fcp *fileCheckPointManager) getPodPath(pod *v1.Pod) string { - return fmt.Sprintf("%v/Pod%v%v.yaml", fcp.path, delimiter, pod.GetUID()) +//getPodKey returns the full qualified path for the pod checkpoint +func getPodKey(pod *v1.Pod) string { + return fmt.Sprintf("Pod%v%v.yaml", delimiter, pod.GetUID()) } // LoadPods Loads All Checkpoints from disk -func (fcp *fileCheckPointManager) LoadPods() ([]*v1.Pod, error) { - checkpoints := make([]*v1.Pod, 0) - files, err := ioutil.ReadDir(fcp.path) +func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) { + pods := make([]*v1.Pod, 0) + + var err error + checkpointKeys := []string{} + checkpointKeys, err = cpm.ListCheckpoints() if err != nil { - return nil, err + glog.Errorf("Failed to list checkpoints: %v", err) } - for _, f := range files { - // get just the filename - _, fname := filepath.Split(f.Name()) - // Get just the Resource from "Resource_Name" - fnfields := strings.Split(fname, delimiter) - switch fnfields[0] { - case podPrefix: - pod, err := fcp.loadPod(fmt.Sprintf("%s/%s", fcp.path, f.Name())) - if err != nil { - return nil, err - } - checkpoints = append(checkpoints, pod) - default: - glog.Warningf("Unsupported checkpoint file detected %v", f) + + for _, key := range checkpointKeys { + checkpoint := NewPodCheckpoint(nil) + err := cpm.GetCheckpoint(key, checkpoint) + if err != nil { + glog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err) + continue } + pods = append(pods, checkpoint.GetPod()) } - return checkpoints, nil + return pods, nil } -// Writes a checkpoint to a file on disk if annotation is present -func (fcp *fileCheckPointManager) WritePod(pod *v1.Pod) error { +// WritePod a checkpoint to a file on disk if annotation is present +func WritePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error { var err error - if fcp.checkAnnotations(pod) { - if blob, err := yaml.Marshal(pod); err == nil { - err = safefile.WriteFile(fcp.getPodPath(pod), blob, 0644) - } + if checkAnnotations(pod) { + data := NewPodCheckpoint(pod) + err = cpm.CreateCheckpoint(getPodKey(pod), data) } else { // This is to handle an edge where a pod update could remove // an annotation and the checkpoint should then be removed. - err = fcp.DeletePod(pod) + err = cpm.RemoveCheckpoint(getPodKey(pod)) } return err } -// Deletes a checkpoint from disk if present -func (fcp *fileCheckPointManager) DeletePod(pod *v1.Pod) error { - podPath := fcp.getPodPath(pod) - if err := os.Remove(podPath); !os.IsNotExist(err) { - return err - } - return nil +// DeletePod deletes a checkpoint from disk if present +func DeletePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error { + return cpm.RemoveCheckpoint(getPodKey(pod)) } diff --git a/pkg/kubelet/checkpoint/checkpoint_test.go b/pkg/kubelet/checkpoint/checkpoint_test.go index 23c35c2c019..e93e61d4ee8 100644 --- a/pkg/kubelet/checkpoint/checkpoint_test.go +++ b/pkg/kubelet/checkpoint/checkpoint_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" ) // TestWriteLoadDeletePods validates all combinations of write, load, and delete @@ -70,15 +71,18 @@ func TestWriteLoadDeletePods(t *testing.T) { } defer os.RemoveAll(dir) - cp := NewCheckpointManager(dir) + cpm, err := checkpointmanager.NewCheckpointManager(dir) + if err != nil { + t.Errorf("Failed to initialize checkpoint manager error=%v", err) + } for _, p := range testPods { // Write pods should always pass unless there is an fs error - if err := cp.WritePod(p.pod); err != nil { + if err := WritePod(cpm, p.pod); err != nil { t.Errorf("Failed to Write Pod: %v", err) } } // verify the correct written files are loaded from disk - pods, err := cp.LoadPods() + pods, err := LoadPods(cpm) if err != nil { t.Errorf("Failed to Load Pods: %v", err) } @@ -104,7 +108,7 @@ func TestWriteLoadDeletePods(t *testing.T) { } else if lpod != nil { t.Errorf("Got unexpected result for %v, should not have been loaded", pname) } - err = cp.DeletePod(p.pod) + err = DeletePod(cpm, p.pod) if err != nil { t.Errorf("Failed to delete pod %v", pname) } diff --git a/pkg/kubelet/checkpointmanager/BUILD b/pkg/kubelet/checkpointmanager/BUILD index c27bb8f5898..0353c2d1ca8 100644 --- a/pkg/kubelet/checkpointmanager/BUILD +++ b/pkg/kubelet/checkpointmanager/BUILD @@ -22,7 +22,7 @@ go_test( srcs = ["checkpoint_manager_test.go"], embed = [":go_default_library"], deps = [ - "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//pkg/kubelet/checkpointmanager/testing:go_default_library", "//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/kubelet/checkpointmanager/README.md b/pkg/kubelet/checkpointmanager/README.md index d5b6fd6eb74..07beef1300f 100644 --- a/pkg/kubelet/checkpointmanager/README.md +++ b/pkg/kubelet/checkpointmanager/README.md @@ -1,8 +1,9 @@ ## DISCLAIMER -Sig-Node community has reached a general consensus, as a best practice, to +- Sig-Node community has reached a general consensus, as a best practice, to avoid introducing any new checkpointing support. We reached this understanding after struggling with some hard-to-debug issues in the production environments caused by the checkpointing. +- Any changes to the checkpointed data structure would be considered incompatible and a component should add its own handling if it needs to ensure backward compatibility of reading old-format checkpoint files. ## Introduction This folder contains a framework & primitives, Checkpointing Manager, which is diff --git a/pkg/kubelet/checkpointmanager/checkpoint_manager.go b/pkg/kubelet/checkpointmanager/checkpoint_manager.go index 1d078357bff..f16e96044f2 100644 --- a/pkg/kubelet/checkpointmanager/checkpoint_manager.go +++ b/pkg/kubelet/checkpointmanager/checkpoint_manager.go @@ -18,6 +18,7 @@ package checkpointmanager import ( "fmt" + "sync" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" @@ -48,8 +49,10 @@ type CheckpointManager interface { type impl struct { path string store utilstore.Store + mutex sync.Mutex } +// NewCheckpointManager returns a new instance of a checkpoint manager func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) { fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{}) if err != nil { @@ -88,6 +91,7 @@ func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) return err } +// RemoveCheckpoint will not return error if checkpoint does not exist. func (manager *impl) RemoveCheckpoint(checkpointKey string) error { manager.mutex.Lock() defer manager.mutex.Unlock() diff --git a/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go b/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go index 4d02c4d5ac0..6081e90483a 100644 --- a/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go +++ b/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go @@ -18,12 +18,11 @@ package checkpointmanager import ( "encoding/json" - "hash/fnv" "sort" "testing" "github.com/stretchr/testify/assert" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" utilstore "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1" ) @@ -50,7 +49,7 @@ type CheckpointDataV2 struct { type protocol string // portMapping is the port mapping configurations of a sandbox. -type portMapping struct { +type PortMapping struct { // protocol of the port mapping. Protocol *protocol // Port number within the container. @@ -153,7 +152,7 @@ func TestCheckpointManager(t *testing.T) { port443 := int32(443) proto := protocol("tcp") - portMappings := []*portMapping{ + portMappings := []*PortMapping{ { &proto, &port80, diff --git a/pkg/kubelet/checkpointmanager/errors/errors.go b/pkg/kubelet/checkpointmanager/errors/errors.go index 25462ffd9c8..bb445f207ec 100644 --- a/pkg/kubelet/checkpointmanager/errors/errors.go +++ b/pkg/kubelet/checkpointmanager/errors/errors.go @@ -18,5 +18,8 @@ package errors import "fmt" -var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.") -var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.") +// ErrCorruptCheckpoint error is reported when checksum does not match +var ErrCorruptCheckpoint = fmt.Errorf("checkpoint is corrupted") + +// ErrCheckpointNotFound is reported when checkpoint is not found for a given key +var ErrCheckpointNotFound = fmt.Errorf("checkpoint is not found") diff --git a/pkg/kubelet/checkpointmanager/testing/util.go b/pkg/kubelet/checkpointmanager/testing/util.go index 34bbb5303ef..8ca4af1d32f 100644 --- a/pkg/kubelet/checkpointmanager/testing/util.go +++ b/pkg/kubelet/checkpointmanager/testing/util.go @@ -27,10 +27,12 @@ type MemStore struct { sync.Mutex } +// NewMemStore returns an instance of MemStore func NewMemStore() *MemStore { return &MemStore{mem: make(map[string][]byte)} } +// Write writes the data to the store func (mstore *MemStore) Write(key string, data []byte) error { mstore.Lock() defer mstore.Unlock() @@ -38,6 +40,7 @@ func (mstore *MemStore) Write(key string, data []byte) error { return nil } +// Read returns data read from store func (mstore *MemStore) Read(key string) ([]byte, error) { mstore.Lock() defer mstore.Unlock() @@ -48,6 +51,7 @@ func (mstore *MemStore) Read(key string) ([]byte, error) { return data, nil } +// Delete deletes data from the store func (mstore *MemStore) Delete(key string) error { mstore.Lock() defer mstore.Unlock() @@ -55,6 +59,7 @@ func (mstore *MemStore) Delete(key string) error { return nil } +// List returns all the keys from the store func (mstore *MemStore) List() ([]string, error) { mstore.Lock() defer mstore.Unlock() diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 66b63999ae3..6c60e2c16b1 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -17,7 +17,6 @@ limitations under the License. package devicemanager import ( - "flag" "fmt" "io/ioutil" "os" @@ -548,7 +547,6 @@ type TestResource struct { } func TestPodContainerDeviceAllocation(t *testing.T) { - flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) res1 := TestResource{ resourceName: "domain1.com/resource1", resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), @@ -753,7 +751,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) { as.Nil(err) testManager := &ManagerImpl{ callback: monitorCallback, - allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String), podDevices: make(podDevices), checkpointManager: ckm, diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 6dcb3a51a56..7df0e201c71 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -144,7 +144,12 @@ func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) continue } - data = append(data, checkpoint.PodDevicesEntry{podUID, conName, resource, devIds, allocResp}) + data = append(data, checkpoint.PodDevicesEntry{ + PodUID: podUID, + ContainerName: conName, + ResourceName: resource, + DeviceIDs: devIds, + AllocResp: allocResp}) } } } diff --git a/pkg/kubelet/config/BUILD b/pkg/kubelet/config/BUILD index 7dd54bc3f4b..37e72d75e2a 100644 --- a/pkg/kubelet/config/BUILD +++ b/pkg/kubelet/config/BUILD @@ -58,6 +58,7 @@ go_library( "//pkg/apis/core/v1:go_default_library", "//pkg/apis/core/validation:go_default_library", "//pkg/kubelet/checkpoint:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/types:go_default_library", diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index c03e32512af..6ffb448254b 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -64,7 +65,7 @@ type PodConfig struct { // contains the list of all configured sources sourcesLock sync.Mutex sources sets.String - checkpointManager checkpoint.Manager + checkpointManager checkpointmanager.CheckpointManager } // NewPodConfig creates an object that can merge many configuration sources into a stream @@ -114,10 +115,12 @@ func (c *PodConfig) Sync() { func (c *PodConfig) Restore(path string, updates chan<- interface{}) error { var err error if c.checkpointManager == nil { - c.checkpointManager = checkpoint.NewCheckpointManager(path) - pods, err := c.checkpointManager.LoadPods() - if err == nil { - updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource} + c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path) + if err != nil { + pods, err := checkpoint.LoadPods(c.checkpointManager) + if err == nil { + updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource} + } } } return err diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index fa8ab7d6a9c..ea7f430fdf5 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -155,7 +155,6 @@ go_test( "//pkg/kubelet/dockershim/libdocker:go_default_library", "//pkg/kubelet/dockershim/network:go_default_library", "//pkg/kubelet/dockershim/network/testing:go_default_library", - "//pkg/kubelet/dockershim/testing:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/cache:go_default_library", "//pkg/security/apparmor:go_default_library", diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index 2b223389205..cc42ef5cabb 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -68,7 +68,7 @@ func (ckm *mockCheckpointManager) RemoveCheckpoint(checkpointKey string) error { func (ckm *mockCheckpointManager) ListCheckpoints() ([]string, error) { var keys []string - for key, _ := range ckm.checkpoint { + for key := range ckm.checkpoint { keys = append(keys, key) } return keys, nil diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9a1755ffe7f..87b8befcf14 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,6 +61,7 @@ import ( kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/configmap" @@ -553,8 +554,15 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.livenessManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() + var checkpointManager checkpointmanager.CheckpointManager + if bootstrapCheckpointPath != "" { + checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath) + if err != nil { + return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err) + } + } // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date. - klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager) + klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager) if remoteRuntimeEndpoint != "" { // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index be074af61a4..a3df9ae555f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -215,7 +215,7 @@ func newTestKubeletWithImageList( kubelet.secretManager = secretManager configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient) kubelet.configMapManager = configMapManager - kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager) + kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager, podtest.NewMockCheckpointManager()) kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}) kubelet.containerRuntime = fakeRuntime diff --git a/pkg/kubelet/mountpod/mount_pod_test.go b/pkg/kubelet/mountpod/mount_pod_test.go index 3f84739fff9..bf7a0ec5602 100644 --- a/pkg/kubelet/mountpod/mount_pod_test.go +++ b/pkg/kubelet/mountpod/mount_pod_test.go @@ -52,7 +52,7 @@ func TestGetVolumeExec(t *testing.T) { fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() podManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) + podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager()) podManager.SetPods(pods) // Prepare fake /var/lib/kubelet diff --git a/pkg/kubelet/pod/BUILD b/pkg/kubelet/pod/BUILD index bba1a722c30..93e77dd955c 100644 --- a/pkg/kubelet/pod/BUILD +++ b/pkg/kubelet/pod/BUILD @@ -15,6 +15,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/kubelet/pod", deps = [ "//pkg/kubelet/checkpoint:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/secret:go_default_library", diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index be15616a9ea..6033ae8d50a 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -24,6 +24,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/kubelet/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/secret" @@ -121,18 +122,18 @@ type basicManager struct { // basicManager is keeping secretManager and configMapManager up-to-date. secretManager secret.Manager configMapManager configmap.Manager - checkpointManager checkpoint.Manager + checkpointManager checkpointmanager.CheckpointManager // A mirror pod client to create/delete mirror pods. MirrorClient } // NewBasicPodManager returns a functional Manager. -func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager) Manager { +func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager, cpm checkpointmanager.CheckpointManager) Manager { pm := &basicManager{} pm.secretManager = secretManager pm.configMapManager = configMapManager - pm.checkpointManager = checkpoint.GetInstance() + pm.checkpointManager = cpm pm.MirrorClient = client pm.SetPods(nil) return pm @@ -161,7 +162,7 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) { defer pm.lock.Unlock() pm.updatePodsInternal(pod) if pm.checkpointManager != nil { - if err := pm.checkpointManager.WritePod(pod); err != nil { + if err := checkpoint.WritePod(pm.checkpointManager, pod); err != nil { glog.Errorf("Error writing checkpoint for pod: %v", pod.GetName()) } } @@ -224,7 +225,7 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) { delete(pm.podByFullName, podFullName) } if pm.checkpointManager != nil { - if err := pm.checkpointManager.DeletePod(pod); err != nil { + if err := checkpoint.DeletePod(pm.checkpointManager, pod); err != nil { glog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName()) } } diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 557a5927f3e..0c24325bd6e 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -34,7 +34,7 @@ func newTestManager() (*basicManager, *podtest.FakeMirrorClient) { fakeMirrorClient := podtest.NewFakeMirrorClient() secretManager := secret.NewFakeManager() configMapManager := configmap.NewFakeManager() - manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager).(*basicManager) + manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager, podtest.NewMockCheckpointManager()).(*basicManager) return manager, fakeMirrorClient } diff --git a/pkg/kubelet/pod/testing/BUILD b/pkg/kubelet/pod/testing/BUILD index b0364a226fe..28daeb0e36d 100644 --- a/pkg/kubelet/pod/testing/BUILD +++ b/pkg/kubelet/pod/testing/BUILD @@ -13,6 +13,8 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/kubelet/pod/testing", deps = [ + "//pkg/kubelet/checkpoint:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor/github.com/stretchr/testify/mock:go_default_library", diff --git a/pkg/kubelet/pod/testing/fake_mirror_client.go b/pkg/kubelet/pod/testing/fake_mirror_client.go index 7a7dd6ae639..a36a80a2c94 100644 --- a/pkg/kubelet/pod/testing/fake_mirror_client.go +++ b/pkg/kubelet/pod/testing/fake_mirror_client.go @@ -21,6 +21,8 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + cp "k8s.io/kubernetes/pkg/kubelet/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) @@ -81,3 +83,37 @@ func (fmc *FakeMirrorClient) GetCounts(podFullName string) (int, int) { defer fmc.mirrorPodLock.RUnlock() return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName] } + +type MockCheckpointManager struct { + checkpoint map[string]*cp.Data +} + +func (ckm *MockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error { + ckm.checkpoint[checkpointKey] = (checkpoint.(*cp.Data)) + return nil +} + +func (ckm *MockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error { + *(checkpoint.(*cp.Data)) = *(ckm.checkpoint[checkpointKey]) + return nil +} + +func (ckm *MockCheckpointManager) RemoveCheckpoint(checkpointKey string) error { + _, ok := ckm.checkpoint[checkpointKey] + if ok { + delete(ckm.checkpoint, "moo") + } + return nil +} + +func (ckm *MockCheckpointManager) ListCheckpoints() ([]string, error) { + var keys []string + for key := range ckm.checkpoint { + keys = append(keys, key) + } + return keys, nil +} + +func NewMockCheckpointManager() checkpointmanager.CheckpointManager { + return &MockCheckpointManager{checkpoint: make(map[string]*cp.Data)} +} diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index f19ecd97b06..74cd10b4bff 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -99,7 +99,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) { func newTestManager() *manager { refManager := kubecontainer.NewRefManager() refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings. - podManager := kubepod.NewBasicPodManager(nil, nil, nil) + podManager := kubepod.NewBasicPodManager(nil, nil, nil, nil) // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) m := NewManager( diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index f05723e1c84..a36c0afde60 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -118,7 +118,7 @@ func TestDoProbe(t *testing.T) { } // Clean up. - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{}) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{}) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 63b37ff2ab7..d8e943c1f75 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -63,7 +63,7 @@ func TestRunOnce(t *testing.T) { fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() podManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) + podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager()) fakeRuntime := &containertest.FakeRuntime{} basePath, err := utiltesting.MkTmpdir("kubelet") if err != nil { diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 6e68b20d5ed..84ddec36e3b 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -75,7 +75,7 @@ func (m *manager) testSyncBatch() { } func newTestManager(kubeClient clientset.Interface) *manager { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager(), podtest.NewMockCheckpointManager()) podManager.AddPod(getTestPod()) return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager) } diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 90c6bf182a5..1ecf9674dd8 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -523,7 +523,7 @@ func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.Persist fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() fakePodManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) + podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager()) fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr) fakeASW := cache.NewActualStateOfWorld("fake", fakeVolumePluginMgr) diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 6e9b01fdb03..0d5af922627 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -55,7 +55,8 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) + cpm := podtest.NewMockCheckpointManager() + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm) node, pod, pv, claim := createObjects() kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) @@ -97,7 +98,8 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) + cpm := podtest.NewMockCheckpointManager() + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm) node, pod, pv, claim := createObjects() claim.Status = v1.PersistentVolumeClaimStatus{ @@ -135,7 +137,8 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) + cpm := podtest.NewMockCheckpointManager() + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm) node, pod, _, claim := createObjects()