Addition of bootstrap checkpointing

This commit is contained in:
Timothy St. Clair 2017-10-29 18:51:06 -05:00
parent 849d7f8595
commit ed4401c126
10 changed files with 370 additions and 23 deletions

View File

@ -157,6 +157,9 @@ type KubeletFlags struct {
ExitOnLockContention bool ExitOnLockContention bool
// seccompProfileRoot is the directory path for seccomp profiles. // seccompProfileRoot is the directory path for seccomp profiles.
SeccompProfileRoot string SeccompProfileRoot string
// bootstrapCheckpointPath is the path to the directory containing pod checkpoints to
// run on restore
BootstrapCheckpointPath string
// DEPRECATED FLAGS // DEPRECATED FLAGS
// minimumGCAge is the minimum age for a finished container before it is // minimumGCAge is the minimum age for a finished container before it is
@ -343,6 +346,7 @@ func (f *KubeletFlags) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&f.LockFilePath, "lock-file", f.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.") fs.StringVar(&f.LockFilePath, "lock-file", f.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.")
fs.BoolVar(&f.ExitOnLockContention, "exit-on-lock-contention", f.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.") fs.BoolVar(&f.ExitOnLockContention, "exit-on-lock-contention", f.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.")
fs.StringVar(&f.SeccompProfileRoot, "seccomp-profile-root", f.SeccompProfileRoot, "<Warning: Alpha feature> Directory path for seccomp profiles.") fs.StringVar(&f.SeccompProfileRoot, "seccomp-profile-root", f.SeccompProfileRoot, "<Warning: Alpha feature> Directory path for seccomp profiles.")
fs.StringVar(&f.BootstrapCheckpointPath, "bootstrap-checkpoint-path", f.BootstrapCheckpointPath, "<Warning: Alpha feature> Path to to the directory where the checkpoints are stored")
// DEPRECATED FLAGS // DEPRECATED FLAGS
fs.DurationVar(&f.MinimumGCAge.Duration, "minimum-container-ttl-duration", f.MinimumGCAge.Duration, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") fs.DurationVar(&f.MinimumGCAge.Duration, "minimum-container-ttl-duration", f.MinimumGCAge.Duration, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")

View File

@ -729,7 +729,8 @@ func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal.
kubeFlags.NonMasqueradeCIDR, kubeFlags.NonMasqueradeCIDR,
kubeFlags.KeepTerminatedPodVolumes, kubeFlags.KeepTerminatedPodVolumes,
kubeFlags.NodeLabels, kubeFlags.NodeLabels,
kubeFlags.SeccompProfileRoot) kubeFlags.SeccompProfileRoot,
kubeFlags.BootstrapCheckpointPath)
if err != nil { if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err) return fmt.Errorf("failed to create kubelet: %v", err)
} }
@ -802,7 +803,8 @@ func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nonMasqueradeCIDR string, nonMasqueradeCIDR string,
keepTerminatedPodVolumes bool, keepTerminatedPodVolumes bool,
nodeLabels map[string]string, nodeLabels map[string]string,
seccompProfileRoot string) (k kubelet.Bootstrap, err error) { seccompProfileRoot string,
bootstrapCheckpointPath string) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations // up into "per source" synchronizations
@ -835,7 +837,8 @@ func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nonMasqueradeCIDR, nonMasqueradeCIDR,
keepTerminatedPodVolumes, keepTerminatedPodVolumes,
nodeLabels, nodeLabels,
seccompProfileRoot) seccompProfileRoot,
bootstrapCheckpointPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -68,6 +68,10 @@ const (
// This annotation can be attached to node. // This annotation can be attached to node.
ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl" ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl"
// BootstrapCheckpointAnnotationKey represents a Resource (Pod) that should be checkpointed by
// the kubelet prior to running
BootstrapCheckpointAnnotationKey string = "node.kubernetes.io/bootstrap-checkpoint"
// annotation key prefix used to identify non-convertible json paths. // annotation key prefix used to identify non-convertible json paths.
NonConvertibleAnnotationPrefix = "non-convertible.kubernetes.io" NonConvertibleAnnotationPrefix = "non-convertible.kubernetes.io"

View File

@ -0,0 +1,151 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"github.com/dchest/safefile"
"github.com/ghodss/yaml"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/volume/util"
)
const (
// Delimiter used on checkpoints written to disk
delimiter = "_"
podPrefix = "Pod"
)
// Manager is the interface used to manage checkpoints
// which involves writing resources to disk to recover
// during restart or failure scenarios.
// https://github.com/kubernetes/community/pull/1241/files
type Manager interface {
// LoadPods will load checkpointed Pods from disk
LoadPods() ([]*v1.Pod, error)
// WritePod will serialize a Pod to disk
WritePod(pod *v1.Pod) error
// Deletes the checkpoint of the given pod from disk
DeletePod(pod *v1.Pod) error
}
var instance Manager
var mutex = &sync.Mutex{}
// fileCheckPointManager - is a checkpointer that writes contents to disk
// The type information of the resource objects are encoded in the name
type fileCheckPointManager struct {
path string
}
// NewCheckpointManager will create a Manager that points to the following path
func NewCheckpointManager(path string) Manager {
// NOTE: This is a precaution; current implementation should not run
// multiple checkpoint managers.
mutex.Lock()
defer mutex.Unlock()
instance = &fileCheckPointManager{path: path}
return instance
}
// GetInstance will return the current Manager, there should be only one.
func GetInstance() Manager {
mutex.Lock()
defer mutex.Unlock()
return instance
}
// loadPod will load Pod Checkpoint yaml file.
func (fcp *fileCheckPointManager) loadPod(file string) (*v1.Pod, error) {
return util.LoadPodFromFile(file)
}
// checkAnnotations will validate the checkpoint annotations exist on the Pod
func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool {
if podAnnotations := pod.GetAnnotations(); podAnnotations != nil {
if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" {
return true
}
}
return false
}
// getPodPath returns the full qualified path for the pod checkpoint
func (fcp *fileCheckPointManager) getPodPath(pod *v1.Pod) string {
return fmt.Sprintf("%v/Pod%v%v.yaml", fcp.path, delimiter, pod.GetUID())
}
// LoadPods Loads All Checkpoints from disk
func (fcp *fileCheckPointManager) LoadPods() ([]*v1.Pod, error) {
checkpoints := make([]*v1.Pod, 0)
files, err := ioutil.ReadDir(fcp.path)
if err != nil {
return nil, err
}
for _, f := range files {
// get just the filename
_, fname := filepath.Split(f.Name())
// Get just the Resource from "Resource_Name"
fnfields := strings.Split(fname, delimiter)
switch fnfields[0] {
case podPrefix:
pod, err := fcp.loadPod(fmt.Sprintf("%s/%s", fcp.path, f.Name()))
if err != nil {
return nil, err
}
checkpoints = append(checkpoints, pod)
default:
glog.Warningf("Unsupported checkpoint file detected %v", f)
}
}
return checkpoints, nil
}
// Writes a checkpoint to a file on disk if annotation is present
func (fcp *fileCheckPointManager) WritePod(pod *v1.Pod) error {
var err error
if fcp.checkAnnotations(pod) {
if blob, err := yaml.Marshal(pod); err == nil {
err = safefile.WriteFile(fcp.getPodPath(pod), blob, 0644)
}
} else {
// This is to handle an edge where a pod update could remove
// an annotation and the checkpoint should then be removed.
err = fcp.DeletePod(pod)
}
return err
}
// Deletes a checkpoint from disk if present
func (fcp *fileCheckPointManager) DeletePod(pod *v1.Pod) error {
podPath := fcp.getPodPath(pod)
if err := os.Remove(podPath); !os.IsNotExist(err) {
return err
}
return nil
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"io/ioutil"
"os"
"reflect"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)
// TestWriteLoadDeletePods validates all combinations of write, load, and delete
func TestWriteLoadDeletePods(t *testing.T) {
testPods := []struct {
pod *v1.Pod
written bool
}{
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "Foo",
Annotations: map[string]string{core.BootstrapCheckpointAnnotationKey: "true"},
UID: "1",
},
},
written: true,
},
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "Foo2",
Annotations: map[string]string{core.BootstrapCheckpointAnnotationKey: "true"},
UID: "2",
},
},
written: true,
},
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "Bar",
UID: "3",
},
},
written: false,
},
}
dir, err := ioutil.TempDir("", "checkpoint")
if err != nil {
t.Errorf("Failed to allocate temp directory for TestWriteLoadDeletePods error=%v", err)
}
defer os.RemoveAll(dir)
cp := NewCheckpointManager(dir)
for _, p := range testPods {
// Write pods should always pass unless there is an fs error
if err := cp.WritePod(p.pod); err != nil {
t.Errorf("Failed to Write Pod: %v", err)
}
}
// verify the correct written files are loaded from disk
pods, err := cp.LoadPods()
if err != nil {
t.Errorf("Failed to Load Pods: %v", err)
}
// loop through contents and check make sure
// what was loaded matched the expected results.
for _, p := range testPods {
pname := p.pod.GetName()
var lpod *v1.Pod
for _, check := range pods {
if check.GetName() == pname {
lpod = check
break
}
}
if p.written {
if lpod != nil {
if !reflect.DeepEqual(p.pod, lpod) {
t.Errorf("expected %#v, \ngot %#v", p.pod, lpod)
}
} else {
t.Errorf("Got unexpected result for %v, should have been loaded", pname)
}
} else if lpod != nil {
t.Errorf("Got unexpected result for %v, should not have been loaded", pname)
}
err = cp.DeletePod(p.pod)
if err != nil {
t.Errorf("Failed to delete pod %v", pname)
}
}
// finally validate the contents of the directory is empty.
files, err := ioutil.ReadDir(dir)
if err != nil {
t.Errorf("Failed to read directory %v", dir)
}
if len(files) > 0 {
t.Errorf("Directory %v should be empty but found %#v", dir, files)
}
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -61,8 +62,9 @@ type PodConfig struct {
updates chan kubetypes.PodUpdate updates chan kubetypes.PodUpdate
// contains the list of all configured sources // contains the list of all configured sources
sourcesLock sync.Mutex sourcesLock sync.Mutex
sources sets.String sources sets.String
checkpointManager checkpoint.Manager
} }
// NewPodConfig creates an object that can merge many configuration sources into a stream // NewPodConfig creates an object that can merge many configuration sources into a stream
@ -108,6 +110,19 @@ func (c *PodConfig) Sync() {
c.pods.Sync() c.pods.Sync()
} }
// Restore restores pods from the checkpoint path, *once*
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
var err error
if c.checkpointManager == nil {
c.checkpointManager = checkpoint.NewCheckpointManager(path)
pods, err := c.checkpointManager.LoadPods()
if err == nil {
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
}
}
return err
}
// podStorage manages the current pod state at any point in time and ensures updates // podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of // to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are // "truth" and on creation contains zero entries. Once all previously read sources are
@ -152,7 +167,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
defer s.updateLock.Unlock() defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source) seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles := s.merge(source, change) adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source) firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications // deliver update notifications
@ -170,6 +185,9 @@ func (s *podStorage) Merge(source string, change interface{}) error {
if len(deletes.Pods) > 0 { if len(deletes.Pods) > 0 {
s.updates <- *deletes s.updates <- *deletes
} }
if len(restores.Pods) > 0 {
s.updates <- *restores
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 { if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are // Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
@ -206,7 +224,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
return nil return nil
} }
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) {
s.podLock.Lock() s.podLock.Lock()
defer s.podLock.Unlock() defer s.podLock.Unlock()
@ -215,6 +233,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
deletePods := []*v1.Pod{} deletePods := []*v1.Pod{}
removePods := []*v1.Pod{} removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{} reconcilePods := []*v1.Pod{}
restorePods := []*v1.Pod{}
pods := s.pods[source] pods := s.pods[source]
if pods == nil { if pods == nil {
@ -287,6 +306,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
removePods = append(removePods, existing) removePods = append(removePods, existing)
} }
} }
case kubetypes.RESTORE:
glog.V(4).Infof("Restoring pods for source %s", source)
default: default:
glog.Warningf("Received invalid update type: %v", update) glog.Warningf("Received invalid update type: %v", update)
@ -300,8 +321,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source} deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source} removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source} reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source}
return adds, updates, deletes, removes, reconciles return adds, updates, deletes, removes, reconciles, restores
} }
func (s *podStorage) markSourceSet(source string) { func (s *podStorage) markSourceSet(source string) {

View File

@ -218,7 +218,8 @@ type Builder func(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nonMasqueradeCIDR string, nonMasqueradeCIDR string,
keepTerminatedPodVolumes bool, keepTerminatedPodVolumes bool,
nodeLabels map[string]string, nodeLabels map[string]string,
seccompProfileRoot string) (Bootstrap, error) seccompProfileRoot string,
bootstrapCheckpointPath string) (Bootstrap, error)
// Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed // Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
@ -271,7 +272,7 @@ type Dependencies struct {
// makePodSourceConfig creates a config.PodConfig from the given // makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error. // KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) { func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header) manifestURLHeader := make(http.Header)
if len(kubeCfg.ManifestURLHeader) > 0 { if len(kubeCfg.ManifestURLHeader) > 0 {
for k, v := range kubeCfg.ManifestURLHeader { for k, v := range kubeCfg.ManifestURLHeader {
@ -286,7 +287,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
// define file config source // define file config source
if kubeCfg.PodManifestPath != "" { if kubeCfg.PodManifestPath != "" {
glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath) glog.Infof("Adding manifest path: %v", kubeCfg.PodManifestPath)
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource)) config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
} }
@ -295,9 +296,22 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader) glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource)) config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
} }
// Restore from the checkpoint path
// NOTE: This MUST happen before creating the apiserver source
// below, or the checkpoint would override the source of truth.
updatechannel := cfg.Channel(kubetypes.ApiserverSource)
if bootstrapCheckpointPath != "" {
glog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
if err != nil {
return nil, err
}
}
if kubeDeps.KubeClient != nil { if kubeDeps.KubeClient != nil {
glog.Infof("Watching apiserver") glog.Infof("Watching apiserver")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource)) config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
} }
return cfg, nil return cfg, nil
} }
@ -345,7 +359,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nonMasqueradeCIDR string, nonMasqueradeCIDR string,
keepTerminatedPodVolumes bool, keepTerminatedPodVolumes bool,
nodeLabels map[string]string, nodeLabels map[string]string,
seccompProfileRoot string) (*Kubelet, error) { seccompProfileRoot string,
bootstrapCheckpointPath string) (*Kubelet, error) {
if rootDirectory == "" { if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory) return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
} }
@ -406,7 +421,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if kubeDeps.PodConfig == nil { if kubeDeps.PodConfig == nil {
var err error var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName) kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1837,17 +1852,28 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods)) glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion. // DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods) handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
glog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET: case kubetypes.SET:
// TODO: Do we want to support this? // TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update") glog.Errorf("Kubelet does not support snapshot update")
} }
// Mark the source ready after receiving at least one update from the if u.Op != kubetypes.RESTORE {
// source. Once all the sources are marked ready, various cleanup // If the update type is RESTORE, it means that the update is from
// routines will start reclaiming resources. It is important that this // the pod checkpoints and may be incomplete. Do not mark the
// takes place only after kubelet calls the update handler to process // source as ready.
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source) // Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
}
case e := <-plegCh: case e := <-plegCh:
if isSyncPodWorthy(e) { if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it. // PLEG event for a pod; sync it.

View File

@ -19,8 +19,11 @@ package pod
import ( import (
"sync" "sync"
"github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/configmap" "k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/secret"
@ -116,8 +119,9 @@ type basicManager struct {
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
// basicManager is keeping secretManager and configMapManager up-to-date. // basicManager is keeping secretManager and configMapManager up-to-date.
secretManager secret.Manager secretManager secret.Manager
configMapManager configmap.Manager configMapManager configmap.Manager
checkpointManager checkpoint.Manager
// A mirror pod client to create/delete mirror pods. // A mirror pod client to create/delete mirror pods.
MirrorClient MirrorClient
@ -128,6 +132,7 @@ func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, confi
pm := &basicManager{} pm := &basicManager{}
pm.secretManager = secretManager pm.secretManager = secretManager
pm.configMapManager = configMapManager pm.configMapManager = configMapManager
pm.checkpointManager = checkpoint.GetInstance()
pm.MirrorClient = client pm.MirrorClient = client
pm.SetPods(nil) pm.SetPods(nil)
return pm return pm
@ -155,6 +160,11 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) {
pm.lock.Lock() pm.lock.Lock()
defer pm.lock.Unlock() defer pm.lock.Unlock()
pm.updatePodsInternal(pod) pm.updatePodsInternal(pod)
if pm.checkpointManager != nil {
if err := pm.checkpointManager.WritePod(pod); err != nil {
glog.Errorf("Error writing checkpoint for pod: %v", pod.GetName())
}
}
} }
// updatePodsInternal replaces the given pods in the current state of the // updatePodsInternal replaces the given pods in the current state of the
@ -213,6 +223,11 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) {
delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID)) delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID))
delete(pm.podByFullName, podFullName) delete(pm.podByFullName, podFullName)
} }
if pm.checkpointManager != nil {
if err := pm.checkpointManager.DeletePod(pod); err != nil {
glog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName())
}
}
} }
func (pm *basicManager) GetPods() []*v1.Pod { func (pm *basicManager) GetPods() []*v1.Pod {

View File

@ -49,6 +49,8 @@ const (
// Pods with the given ids have unexpected status in this source, // Pods with the given ids have unexpected status in this source,
// kubelet should reconcile status with this source // kubelet should reconcile status with this source
RECONCILE RECONCILE
// Pods with the given ids have been restored from a checkpoint.
RESTORE
// These constants identify the sources of pods // These constants identify the sources of pods
// Updates from a file // Updates from a file

View File

@ -233,7 +233,7 @@ func LoadPodFromFile(filePath string) (*v1.Pod, error) {
} }
pod := &v1.Pod{} pod := &v1.Pod{}
codec := legacyscheme.Codecs.LegacyCodec(legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion) codec := legacyscheme.Codecs.UniversalDecoder()
if err := runtime.DecodeInto(codec, podDef, pod); err != nil { if err := runtime.DecodeInto(codec, podDef, pod); err != nil {
return nil, fmt.Errorf("failed decoding file: %v", err) return nil, fmt.Errorf("failed decoding file: %v", err)
} }