mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Merge pull request #63553 from rphillips/fixes/checkpoint_logic_on_restore
Automatic merge from submit-queue (batch tested with PRs 63151, 63795, 63553, 64068, 64113). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. kubelet: fix checkpoint manager logic bug on restore **What this PR does / why we need it**: I am testing the new checkpoint logic within the kubelet and ran across a logic bug on API server restores. Initial PR: https://github.com/kubernetes/kubernetes/pull/56040 **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: **Special notes for your reviewer**: /cc @vikaschoudhary16 **Release note**: ```release-note NONE ```
This commit is contained in:
commit
6935b755b9
@ -108,6 +108,8 @@ go_test(
|
|||||||
"//pkg/apis/core:go_default_library",
|
"//pkg/apis/core:go_default_library",
|
||||||
"//pkg/apis/core/v1:go_default_library",
|
"//pkg/apis/core/v1:go_default_library",
|
||||||
"//pkg/apis/core/validation:go_default_library",
|
"//pkg/apis/core/validation:go_default_library",
|
||||||
|
"//pkg/kubelet/checkpoint:go_default_library",
|
||||||
|
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||||
"//pkg/kubelet/types:go_default_library",
|
"//pkg/kubelet/types:go_default_library",
|
||||||
"//pkg/securitycontext:go_default_library",
|
"//pkg/securitycontext:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
|
@ -113,17 +113,20 @@ func (c *PodConfig) Sync() {
|
|||||||
|
|
||||||
// Restore restores pods from the checkpoint path, *once*
|
// Restore restores pods from the checkpoint path, *once*
|
||||||
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
|
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
|
||||||
var err error
|
if c.checkpointManager != nil {
|
||||||
if c.checkpointManager == nil {
|
return nil
|
||||||
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
|
|
||||||
if err != nil {
|
|
||||||
pods, err := checkpoint.LoadPods(c.checkpointManager)
|
|
||||||
if err == nil {
|
|
||||||
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return err
|
var err error
|
||||||
|
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pods, err := checkpoint.LoadPods(c.checkpointManager)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// podStorage manages the current pod state at any point in time and ensures updates
|
// podStorage manages the current pod state at any point in time and ensures updates
|
||||||
@ -311,6 +314,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|||||||
}
|
}
|
||||||
case kubetypes.RESTORE:
|
case kubetypes.RESTORE:
|
||||||
glog.V(4).Infof("Restoring pods for source %s", source)
|
glog.V(4).Infof("Restoring pods for source %s", source)
|
||||||
|
for _, value := range update.Pods {
|
||||||
|
restorePods = append(restorePods, value)
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
glog.Warningf("Received invalid update type: %v", update)
|
glog.Warningf("Received invalid update type: %v", update)
|
||||||
|
@ -17,7 +17,9 @@ limitations under the License.
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -30,6 +32,9 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/core"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
"k8s.io/kubernetes/pkg/securitycontext"
|
"k8s.io/kubernetes/pkg/securitycontext"
|
||||||
)
|
)
|
||||||
@ -85,6 +90,14 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
|
|||||||
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
|
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createPodConfigTesterByChannel(mode PodConfigNotificationMode, channelName string) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
|
||||||
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
|
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
|
||||||
|
channel := config.Channel(channelName)
|
||||||
|
ch := config.Updates()
|
||||||
|
return channel, ch, config
|
||||||
|
}
|
||||||
|
|
||||||
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
|
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
|
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
|
||||||
@ -413,3 +426,29 @@ func TestPodUpdateLabels(t *testing.T) {
|
|||||||
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPodRestore(t *testing.T) {
|
||||||
|
tmpDir, _ := ioutil.TempDir("", "")
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
pod := CreateValidPod("api-server", "kube-default")
|
||||||
|
pod.Annotations = make(map[string]string, 0)
|
||||||
|
pod.Annotations["kubernetes.io/config.source"] = kubetypes.ApiserverSource
|
||||||
|
pod.Annotations[core.BootstrapCheckpointAnnotationKey] = "true"
|
||||||
|
|
||||||
|
// Create Checkpointer
|
||||||
|
checkpointManager, err := checkpointmanager.NewCheckpointManager(tmpDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to initialize checkpoint manager: %v", err)
|
||||||
|
}
|
||||||
|
if err := checkpoint.WritePod(checkpointManager, pod); err != nil {
|
||||||
|
t.Fatalf("Error writing checkpoint for pod: %v", pod.GetName())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore checkpoint
|
||||||
|
channel, ch, config := createPodConfigTesterByChannel(PodConfigNotificationIncremental, kubetypes.ApiserverSource)
|
||||||
|
if err := config.Restore(tmpDir, channel); err != nil {
|
||||||
|
t.Fatalf("Restore returned error: %v", err)
|
||||||
|
}
|
||||||
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RESTORE, kubetypes.ApiserverSource, pod))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user