From ed4401c126838fa99b3e4cb13f9e57efec59d2c2 Mon Sep 17 00:00:00 2001 From: "Timothy St. Clair" Date: Sun, 29 Oct 2017 18:51:06 -0500 Subject: [PATCH] Addition of bootstrap checkpointing --- cmd/kubelet/app/options/options.go | 4 + cmd/kubelet/app/server.go | 9 +- pkg/apis/core/annotation_key_constants.go | 4 + pkg/kubelet/checkpoint/checkpoint.go | 151 ++++++++++++++++++++++ pkg/kubelet/checkpoint/checkpoint_test.go | 120 +++++++++++++++++ pkg/kubelet/config/config.go | 32 ++++- pkg/kubelet/kubelet.go | 50 +++++-- pkg/kubelet/pod/pod_manager.go | 19 ++- pkg/kubelet/types/pod_update.go | 2 + pkg/volume/util/util.go | 2 +- 10 files changed, 370 insertions(+), 23 deletions(-) create mode 100644 pkg/kubelet/checkpoint/checkpoint.go create mode 100644 pkg/kubelet/checkpoint/checkpoint_test.go diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 9b17a098729..4e9e10e9df4 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -157,6 +157,9 @@ type KubeletFlags struct { ExitOnLockContention bool // seccompProfileRoot is the directory path for seccomp profiles. SeccompProfileRoot string + // bootstrapCheckpointPath is the path to the directory containing pod checkpoints to + // run on restore + BootstrapCheckpointPath string // DEPRECATED FLAGS // minimumGCAge is the minimum age for a finished container before it is @@ -343,6 +346,7 @@ func (f *KubeletFlags) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&f.LockFilePath, "lock-file", f.LockFilePath, " The path to file for kubelet to use as a lock file.") fs.BoolVar(&f.ExitOnLockContention, "exit-on-lock-contention", f.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.") fs.StringVar(&f.SeccompProfileRoot, "seccomp-profile-root", f.SeccompProfileRoot, " Directory path for seccomp profiles.") + fs.StringVar(&f.BootstrapCheckpointPath, "bootstrap-checkpoint-path", f.BootstrapCheckpointPath, " Path to to the directory where the checkpoints are stored") // DEPRECATED FLAGS fs.DurationVar(&f.MinimumGCAge.Duration, "minimum-container-ttl-duration", f.MinimumGCAge.Duration, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 94dd2fc7316..8100b44afae 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -729,7 +729,8 @@ func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal. kubeFlags.NonMasqueradeCIDR, kubeFlags.KeepTerminatedPodVolumes, kubeFlags.NodeLabels, - kubeFlags.SeccompProfileRoot) + kubeFlags.SeccompProfileRoot, + kubeFlags.BootstrapCheckpointPath) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } @@ -802,7 +803,8 @@ func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nonMasqueradeCIDR string, keepTerminatedPodVolumes bool, nodeLabels map[string]string, - seccompProfileRoot string) (k kubelet.Bootstrap, err error) { + seccompProfileRoot string, + bootstrapCheckpointPath string) (k kubelet.Bootstrap, err error) { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations @@ -835,7 +837,8 @@ func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nonMasqueradeCIDR, keepTerminatedPodVolumes, nodeLabels, - seccompProfileRoot) + seccompProfileRoot, + bootstrapCheckpointPath) if err != nil { return nil, err } diff --git a/pkg/apis/core/annotation_key_constants.go b/pkg/apis/core/annotation_key_constants.go index c441a601b18..131fdd9905e 100644 --- a/pkg/apis/core/annotation_key_constants.go +++ b/pkg/apis/core/annotation_key_constants.go @@ -68,6 +68,10 @@ const ( // This annotation can be attached to node. ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl" + // BootstrapCheckpointAnnotationKey represents a Resource (Pod) that should be checkpointed by + // the kubelet prior to running + BootstrapCheckpointAnnotationKey string = "node.kubernetes.io/bootstrap-checkpoint" + // annotation key prefix used to identify non-convertible json paths. NonConvertibleAnnotationPrefix = "non-convertible.kubernetes.io" diff --git a/pkg/kubelet/checkpoint/checkpoint.go b/pkg/kubelet/checkpoint/checkpoint.go new file mode 100644 index 00000000000..d4b30f902d9 --- /dev/null +++ b/pkg/kubelet/checkpoint/checkpoint.go @@ -0,0 +1,151 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpoint + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/dchest/safefile" + "github.com/ghodss/yaml" + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/volume/util" +) + +const ( + // Delimiter used on checkpoints written to disk + delimiter = "_" + podPrefix = "Pod" +) + +// Manager is the interface used to manage checkpoints +// which involves writing resources to disk to recover +// during restart or failure scenarios. +// https://github.com/kubernetes/community/pull/1241/files +type Manager interface { + // LoadPods will load checkpointed Pods from disk + LoadPods() ([]*v1.Pod, error) + + // WritePod will serialize a Pod to disk + WritePod(pod *v1.Pod) error + + // Deletes the checkpoint of the given pod from disk + DeletePod(pod *v1.Pod) error +} + +var instance Manager +var mutex = &sync.Mutex{} + +// fileCheckPointManager - is a checkpointer that writes contents to disk +// The type information of the resource objects are encoded in the name +type fileCheckPointManager struct { + path string +} + +// NewCheckpointManager will create a Manager that points to the following path +func NewCheckpointManager(path string) Manager { + // NOTE: This is a precaution; current implementation should not run + // multiple checkpoint managers. + mutex.Lock() + defer mutex.Unlock() + instance = &fileCheckPointManager{path: path} + return instance +} + +// GetInstance will return the current Manager, there should be only one. +func GetInstance() Manager { + mutex.Lock() + defer mutex.Unlock() + return instance +} + +// loadPod will load Pod Checkpoint yaml file. +func (fcp *fileCheckPointManager) loadPod(file string) (*v1.Pod, error) { + return util.LoadPodFromFile(file) +} + +// checkAnnotations will validate the checkpoint annotations exist on the Pod +func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool { + if podAnnotations := pod.GetAnnotations(); podAnnotations != nil { + if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" { + return true + } + } + return false +} + +// getPodPath returns the full qualified path for the pod checkpoint +func (fcp *fileCheckPointManager) getPodPath(pod *v1.Pod) string { + return fmt.Sprintf("%v/Pod%v%v.yaml", fcp.path, delimiter, pod.GetUID()) +} + +// LoadPods Loads All Checkpoints from disk +func (fcp *fileCheckPointManager) LoadPods() ([]*v1.Pod, error) { + checkpoints := make([]*v1.Pod, 0) + files, err := ioutil.ReadDir(fcp.path) + if err != nil { + return nil, err + } + for _, f := range files { + // get just the filename + _, fname := filepath.Split(f.Name()) + // Get just the Resource from "Resource_Name" + fnfields := strings.Split(fname, delimiter) + switch fnfields[0] { + case podPrefix: + pod, err := fcp.loadPod(fmt.Sprintf("%s/%s", fcp.path, f.Name())) + if err != nil { + return nil, err + } + checkpoints = append(checkpoints, pod) + default: + glog.Warningf("Unsupported checkpoint file detected %v", f) + } + } + return checkpoints, nil +} + +// Writes a checkpoint to a file on disk if annotation is present +func (fcp *fileCheckPointManager) WritePod(pod *v1.Pod) error { + var err error + if fcp.checkAnnotations(pod) { + if blob, err := yaml.Marshal(pod); err == nil { + err = safefile.WriteFile(fcp.getPodPath(pod), blob, 0644) + } + } else { + // This is to handle an edge where a pod update could remove + // an annotation and the checkpoint should then be removed. + err = fcp.DeletePod(pod) + } + return err +} + +// Deletes a checkpoint from disk if present +func (fcp *fileCheckPointManager) DeletePod(pod *v1.Pod) error { + podPath := fcp.getPodPath(pod) + if err := os.Remove(podPath); !os.IsNotExist(err) { + return err + } + return nil +} diff --git a/pkg/kubelet/checkpoint/checkpoint_test.go b/pkg/kubelet/checkpoint/checkpoint_test.go new file mode 100644 index 00000000000..23c35c2c019 --- /dev/null +++ b/pkg/kubelet/checkpoint/checkpoint_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpoint + +import ( + "io/ioutil" + "os" + "reflect" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/core" +) + +// TestWriteLoadDeletePods validates all combinations of write, load, and delete +func TestWriteLoadDeletePods(t *testing.T) { + testPods := []struct { + pod *v1.Pod + written bool + }{ + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Foo", + Annotations: map[string]string{core.BootstrapCheckpointAnnotationKey: "true"}, + UID: "1", + }, + }, + written: true, + }, + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Foo2", + Annotations: map[string]string{core.BootstrapCheckpointAnnotationKey: "true"}, + UID: "2", + }, + }, + written: true, + }, + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Bar", + UID: "3", + }, + }, + written: false, + }, + } + + dir, err := ioutil.TempDir("", "checkpoint") + if err != nil { + t.Errorf("Failed to allocate temp directory for TestWriteLoadDeletePods error=%v", err) + } + defer os.RemoveAll(dir) + + cp := NewCheckpointManager(dir) + for _, p := range testPods { + // Write pods should always pass unless there is an fs error + if err := cp.WritePod(p.pod); err != nil { + t.Errorf("Failed to Write Pod: %v", err) + } + } + // verify the correct written files are loaded from disk + pods, err := cp.LoadPods() + if err != nil { + t.Errorf("Failed to Load Pods: %v", err) + } + // loop through contents and check make sure + // what was loaded matched the expected results. + for _, p := range testPods { + pname := p.pod.GetName() + var lpod *v1.Pod + for _, check := range pods { + if check.GetName() == pname { + lpod = check + break + } + } + if p.written { + if lpod != nil { + if !reflect.DeepEqual(p.pod, lpod) { + t.Errorf("expected %#v, \ngot %#v", p.pod, lpod) + } + } else { + t.Errorf("Got unexpected result for %v, should have been loaded", pname) + } + } else if lpod != nil { + t.Errorf("Got unexpected result for %v, should not have been loaded", pname) + } + err = cp.DeletePod(p.pod) + if err != nil { + t.Errorf("Failed to delete pod %v", pname) + } + } + // finally validate the contents of the directory is empty. + files, err := ioutil.ReadDir(dir) + if err != nil { + t.Errorf("Failed to read directory %v", dir) + } + if len(files) > 0 { + t.Errorf("Directory %v should be empty but found %#v", dir, files) + } +} diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index ca322d45f3c..d131c6ce395 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/kubelet/checkpoint" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -61,8 +62,9 @@ type PodConfig struct { updates chan kubetypes.PodUpdate // contains the list of all configured sources - sourcesLock sync.Mutex - sources sets.String + sourcesLock sync.Mutex + sources sets.String + checkpointManager checkpoint.Manager } // NewPodConfig creates an object that can merge many configuration sources into a stream @@ -108,6 +110,19 @@ func (c *PodConfig) Sync() { c.pods.Sync() } +// Restore restores pods from the checkpoint path, *once* +func (c *PodConfig) Restore(path string, updates chan<- interface{}) error { + var err error + if c.checkpointManager == nil { + c.checkpointManager = checkpoint.NewCheckpointManager(path) + pods, err := c.checkpointManager.LoadPods() + if err == nil { + updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource} + } + } + return err +} + // podStorage manages the current pod state at any point in time and ensures updates // to the channel are delivered in order. Note that this object is an in-memory source of // "truth" and on creation contains zero entries. Once all previously read sources are @@ -152,7 +167,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source) - adds, updates, deletes, removes, reconciles := s.merge(source, change) + adds, updates, deletes, removes, reconciles, restores := s.merge(source, change) firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications @@ -170,6 +185,9 @@ func (s *podStorage) Merge(source string, change interface{}) error { if len(deletes.Pods) > 0 { s.updates <- *deletes } + if len(restores.Pods) > 0 { + s.updates <- *restores + } if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 { // Send an empty update when first seeing the source and there are // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that @@ -206,7 +224,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { return nil } -func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { +func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) { s.podLock.Lock() defer s.podLock.Unlock() @@ -215,6 +233,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de deletePods := []*v1.Pod{} removePods := []*v1.Pod{} reconcilePods := []*v1.Pod{} + restorePods := []*v1.Pod{} pods := s.pods[source] if pods == nil { @@ -287,6 +306,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de removePods = append(removePods, existing) } } + case kubetypes.RESTORE: + glog.V(4).Infof("Restoring pods for source %s", source) default: glog.Warningf("Received invalid update type: %v", update) @@ -300,8 +321,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source} removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source} reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source} + restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source} - return adds, updates, deletes, removes, reconciles + return adds, updates, deletes, removes, reconciles, restores } func (s *podStorage) markSourceSet(source string) { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 025340803b1..27453899465 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -218,7 +218,8 @@ type Builder func(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nonMasqueradeCIDR string, keepTerminatedPodVolumes bool, nodeLabels map[string]string, - seccompProfileRoot string) (Bootstrap, error) + seccompProfileRoot string, + bootstrapCheckpointPath string) (Bootstrap, error) // Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping @@ -271,7 +272,7 @@ type Dependencies struct { // makePodSourceConfig creates a config.PodConfig from the given // KubeletConfiguration or returns an error. -func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) { +func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) { manifestURLHeader := make(http.Header) if len(kubeCfg.ManifestURLHeader) > 0 { for k, v := range kubeCfg.ManifestURLHeader { @@ -286,7 +287,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku // define file config source if kubeCfg.PodManifestPath != "" { - glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath) + glog.Infof("Adding manifest path: %v", kubeCfg.PodManifestPath) config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource)) } @@ -295,9 +296,22 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader) config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource)) } + + // Restore from the checkpoint path + // NOTE: This MUST happen before creating the apiserver source + // below, or the checkpoint would override the source of truth. + updatechannel := cfg.Channel(kubetypes.ApiserverSource) + if bootstrapCheckpointPath != "" { + glog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath) + err := cfg.Restore(bootstrapCheckpointPath, updatechannel) + if err != nil { + return nil, err + } + } + if kubeDeps.KubeClient != nil { glog.Infof("Watching apiserver") - config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource)) + config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel) } return cfg, nil } @@ -345,7 +359,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nonMasqueradeCIDR string, keepTerminatedPodVolumes bool, nodeLabels map[string]string, - seccompProfileRoot string) (*Kubelet, error) { + seccompProfileRoot string, + bootstrapCheckpointPath string) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) } @@ -406,7 +421,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, if kubeDeps.PodConfig == nil { var err error - kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName) + kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath) if err != nil { return nil, err } @@ -1837,17 +1852,28 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods)) // DELETE is treated as a UPDATE because of graceful deletion. handler.HandlePodUpdates(u.Pods) + case kubetypes.RESTORE: + glog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods)) + // These are pods restored from the checkpoint. Treat them as new + // pods. + handler.HandlePodAdditions(u.Pods) case kubetypes.SET: // TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") } - // Mark the source ready after receiving at least one update from the - // source. Once all the sources are marked ready, various cleanup - // routines will start reclaiming resources. It is important that this - // takes place only after kubelet calls the update handler to process - // the update to ensure the internal pod cache is up-to-date. - kl.sourcesReady.AddSource(u.Source) + if u.Op != kubetypes.RESTORE { + // If the update type is RESTORE, it means that the update is from + // the pod checkpoints and may be incomplete. Do not mark the + // source as ready. + + // Mark the source ready after receiving at least one update from the + // source. Once all the sources are marked ready, various cleanup + // routines will start reclaiming resources. It is important that this + // takes place only after kubelet calls the update handler to process + // the update to ensure the internal pod cache is up-to-date. + kl.sourcesReady.AddSource(u.Source) + } case e := <-plegCh: if isSyncPodWorthy(e) { // PLEG event for a pod; sync it. diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 20940a7dc5b..be15616a9ea 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -19,8 +19,11 @@ package pod import ( "sync" + "github.com/golang/glog" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/kubelet/checkpoint" "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/secret" @@ -116,8 +119,9 @@ type basicManager struct { translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID // basicManager is keeping secretManager and configMapManager up-to-date. - secretManager secret.Manager - configMapManager configmap.Manager + secretManager secret.Manager + configMapManager configmap.Manager + checkpointManager checkpoint.Manager // A mirror pod client to create/delete mirror pods. MirrorClient @@ -128,6 +132,7 @@ func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, confi pm := &basicManager{} pm.secretManager = secretManager pm.configMapManager = configMapManager + pm.checkpointManager = checkpoint.GetInstance() pm.MirrorClient = client pm.SetPods(nil) return pm @@ -155,6 +160,11 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) { pm.lock.Lock() defer pm.lock.Unlock() pm.updatePodsInternal(pod) + if pm.checkpointManager != nil { + if err := pm.checkpointManager.WritePod(pod); err != nil { + glog.Errorf("Error writing checkpoint for pod: %v", pod.GetName()) + } + } } // updatePodsInternal replaces the given pods in the current state of the @@ -213,6 +223,11 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) { delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID)) delete(pm.podByFullName, podFullName) } + if pm.checkpointManager != nil { + if err := pm.checkpointManager.DeletePod(pod); err != nil { + glog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName()) + } + } } func (pm *basicManager) GetPods() []*v1.Pod { diff --git a/pkg/kubelet/types/pod_update.go b/pkg/kubelet/types/pod_update.go index 7d1f481dc31..6bfc3dac63f 100644 --- a/pkg/kubelet/types/pod_update.go +++ b/pkg/kubelet/types/pod_update.go @@ -49,6 +49,8 @@ const ( // Pods with the given ids have unexpected status in this source, // kubelet should reconcile status with this source RECONCILE + // Pods with the given ids have been restored from a checkpoint. + RESTORE // These constants identify the sources of pods // Updates from a file diff --git a/pkg/volume/util/util.go b/pkg/volume/util/util.go index 2bd4fc89c28..4fd236b20f2 100644 --- a/pkg/volume/util/util.go +++ b/pkg/volume/util/util.go @@ -233,7 +233,7 @@ func LoadPodFromFile(filePath string) (*v1.Pod, error) { } pod := &v1.Pod{} - codec := legacyscheme.Codecs.LegacyCodec(legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion) + codec := legacyscheme.Codecs.UniversalDecoder() if err := runtime.DecodeInto(codec, podDef, pod); err != nil { return nil, fmt.Errorf("failed decoding file: %v", err) }