mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #51146 from mtaufen/remove-crashloop-detection
Automatic merge from submit-queue Remove crash loop "detection" from the dynamic kubelet config feature **What this PR does / why we need it**: The subfeature was a cool idea, but in the end it is very complex to separate Kubelet restarts into crash-loops caused by config vs. crash-loops caused by other phenomena, like admin-triggered node restarts, kernel panics, and and process babysitter behavior. Dynamic kubelet config will be better off without the potential for false positives here. Removing this subfeature also simplifies dynamic configuration by reducing persistent state: - we no longer need to track bad config in a file - we no longer need to track kubelet startups in a file **Which issue this PR fixes**: fixes #50216 **Release note**: ```release-note NONE ```
This commit is contained in:
commit
73a6ee1dcc
@ -56,17 +56,8 @@ type KubeletConfiguration struct {
|
||||
metav1.TypeMeta
|
||||
|
||||
// Only used for dynamic configuration.
|
||||
// The length of the trial period for this configuration. If the Kubelet records CrashLoopThreshold or
|
||||
// more startups during this period, the current configuration will be marked bad and the
|
||||
// Kubelet will roll-back to the last-known-good. Default 10 minutes.
|
||||
ConfigTrialDuration metav1.Duration
|
||||
// Only used for dynamic configuration.
|
||||
// If this number of Kubelet "crashes" during ConfigTrialDuration meets this threshold,
|
||||
// the configuration fails the trial and the Kubelet rolls back to its last-known-good config.
|
||||
// Crash-loops are detected by counting Kubelet startups, so one startup is implicitly added
|
||||
// to this threshold to always allow a single restart per config change.
|
||||
// Default 10, mimimum allowed is 0, maximum allowed is 10.
|
||||
CrashLoopThreshold int32
|
||||
// The length of the trial period for this configuration. This configuration will become the last-known-good after this duration.
|
||||
ConfigTrialDuration *metav1.Duration
|
||||
// podManifestPath is the path to the directory containing pod manifests to
|
||||
// run, or the path to a single manifest file
|
||||
PodManifestPath string
|
||||
|
@ -53,9 +53,6 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
|
||||
if obj.ConfigTrialDuration == nil {
|
||||
obj.ConfigTrialDuration = &metav1.Duration{Duration: 10 * time.Minute}
|
||||
}
|
||||
if obj.CrashLoopThreshold == nil {
|
||||
obj.CrashLoopThreshold = utilpointer.Int32Ptr(10)
|
||||
}
|
||||
if obj.Authentication.Anonymous.Enabled == nil {
|
||||
obj.Authentication.Anonymous.Enabled = boolVar(true)
|
||||
}
|
||||
|
@ -51,17 +51,8 @@ type KubeletConfiguration struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
|
||||
// Only used for dynamic configuration.
|
||||
// The length of the trial period for this configuration. If the Kubelet records CrashLoopThreshold or
|
||||
// more startups during this period, the current configuration will be marked bad and the
|
||||
// Kubelet will roll-back to the last-known-good. Default 10 minutes.
|
||||
// The length of the trial period for this configuration. This configuration will become the last-known-good after this duration.
|
||||
ConfigTrialDuration *metav1.Duration `json:"configTrialDuration"`
|
||||
// Only used for dynamic configuration.
|
||||
// If this number of Kubelet "crashes" during ConfigTrialDuration meets this threshold,
|
||||
// the configuration fails the trial and the Kubelet rolls back to its last-known-good config.
|
||||
// Crash-loops are detected by counting Kubelet startups, so one startup is implicitly added
|
||||
// to this threshold to always allow a single restart per config change.
|
||||
// Default 10, mimimum allowed is 0, maximum allowed is 10.
|
||||
CrashLoopThreshold *int32 `json:"crashLoopThreshold"`
|
||||
// podManifestPath is the path to the directory containing pod manifests to
|
||||
// run, or the path to a single manifest file
|
||||
PodManifestPath string `json:"podManifestPath"`
|
||||
|
@ -142,12 +142,7 @@ func Convert_kubeletconfig_KubeletAuthorization_To_v1alpha1_KubeletAuthorization
|
||||
}
|
||||
|
||||
func autoConvert_v1alpha1_KubeletConfiguration_To_kubeletconfig_KubeletConfiguration(in *KubeletConfiguration, out *kubeletconfig.KubeletConfiguration, s conversion.Scope) error {
|
||||
if err := v1.Convert_Pointer_v1_Duration_To_v1_Duration(&in.ConfigTrialDuration, &out.ConfigTrialDuration, s); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := v1.Convert_Pointer_int32_To_int32(&in.CrashLoopThreshold, &out.CrashLoopThreshold, s); err != nil {
|
||||
return err
|
||||
}
|
||||
out.ConfigTrialDuration = (*v1.Duration)(unsafe.Pointer(in.ConfigTrialDuration))
|
||||
out.PodManifestPath = in.PodManifestPath
|
||||
out.SyncFrequency = in.SyncFrequency
|
||||
out.FileCheckFrequency = in.FileCheckFrequency
|
||||
@ -306,12 +301,7 @@ func Convert_v1alpha1_KubeletConfiguration_To_kubeletconfig_KubeletConfiguration
|
||||
}
|
||||
|
||||
func autoConvert_kubeletconfig_KubeletConfiguration_To_v1alpha1_KubeletConfiguration(in *kubeletconfig.KubeletConfiguration, out *KubeletConfiguration, s conversion.Scope) error {
|
||||
if err := v1.Convert_v1_Duration_To_Pointer_v1_Duration(&in.ConfigTrialDuration, &out.ConfigTrialDuration, s); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := v1.Convert_int32_To_Pointer_int32(&in.CrashLoopThreshold, &out.CrashLoopThreshold, s); err != nil {
|
||||
return err
|
||||
}
|
||||
out.ConfigTrialDuration = (*v1.Duration)(unsafe.Pointer(in.ConfigTrialDuration))
|
||||
out.PodManifestPath = in.PodManifestPath
|
||||
out.SyncFrequency = in.SyncFrequency
|
||||
out.FileCheckFrequency = in.FileCheckFrequency
|
||||
|
@ -143,15 +143,6 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
if in.CrashLoopThreshold != nil {
|
||||
in, out := &in.CrashLoopThreshold, &out.CrashLoopThreshold
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(int32)
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
out.SyncFrequency = in.SyncFrequency
|
||||
out.FileCheckFrequency = in.FileCheckFrequency
|
||||
out.HTTPCheckFrequency = in.HTTPCheckFrequency
|
||||
|
@ -23,18 +23,8 @@ import (
|
||||
containermanager "k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
)
|
||||
|
||||
// MaxCrashLoopThreshold is the maximum allowed KubeletConfiguraiton.CrashLoopThreshold
|
||||
const MaxCrashLoopThreshold = 10
|
||||
|
||||
// ValidateKubeletConfiguration validates `kc` and returns an error if it is invalid
|
||||
func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration) error {
|
||||
// restrict crashloop threshold to between 0 and `maxCrashLoopThreshold`, inclusive
|
||||
// more than `maxStartups=maxCrashLoopThreshold` adds unnecessary bloat to the .startups.json file,
|
||||
// and negative values would be silly.
|
||||
if kc.CrashLoopThreshold < 0 || kc.CrashLoopThreshold > MaxCrashLoopThreshold {
|
||||
return fmt.Errorf("field `CrashLoopThreshold` must be between 0 and %d, inclusive", MaxCrashLoopThreshold)
|
||||
}
|
||||
|
||||
if !kc.CgroupsPerQOS && len(kc.EnforceNodeAllocatable) > 0 {
|
||||
return fmt.Errorf("node allocatable enforcement is not supported unless Cgroups Per QOS feature is turned on")
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ limitations under the License.
|
||||
package kubeletconfig
|
||||
|
||||
import (
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
conversion "k8s.io/apimachinery/pkg/conversion"
|
||||
runtime "k8s.io/apimachinery/pkg/runtime"
|
||||
api "k8s.io/kubernetes/pkg/api"
|
||||
@ -124,7 +125,15 @@ func (in *KubeletAuthorization) DeepCopy() *KubeletAuthorization {
|
||||
func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
|
||||
*out = *in
|
||||
out.TypeMeta = in.TypeMeta
|
||||
out.ConfigTrialDuration = in.ConfigTrialDuration
|
||||
if in.ConfigTrialDuration != nil {
|
||||
in, out := &in.ConfigTrialDuration, &out.ConfigTrialDuration
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(v1.Duration)
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
out.SyncFrequency = in.SyncFrequency
|
||||
out.FileCheckFrequency = in.FileCheckFrequency
|
||||
out.HTTPCheckFrequency = in.HTTPCheckFrequency
|
||||
|
@ -16,17 +16,14 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
|
||||
"//pkg/kubelet/apis/kubeletconfig/validation:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/badconfig:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/checkpoint/store:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/configfiles:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/startups:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/status:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/equal:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/panic:go_default_library",
|
||||
"//pkg/version:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
@ -49,10 +46,8 @@ filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//pkg/kubelet/kubeletconfig/badconfig:all-srcs",
|
||||
"//pkg/kubelet/kubeletconfig/checkpoint:all-srcs",
|
||||
"//pkg/kubelet/kubeletconfig/configfiles:all-srcs",
|
||||
"//pkg/kubelet/kubeletconfig/startups:all-srcs",
|
||||
"//pkg/kubelet/kubeletconfig/status:all-srcs",
|
||||
"//pkg/kubelet/kubeletconfig/util/codec:all-srcs",
|
||||
"//pkg/kubelet/kubeletconfig/util/equal:all-srcs",
|
||||
|
@ -1,47 +0,0 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"badconfig_test.go",
|
||||
"fstracker_test.go",
|
||||
],
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"badconfig.go",
|
||||
"fstracker.go",
|
||||
],
|
||||
deps = [
|
||||
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
@ -1,83 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package badconfig
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Tracker tracks "bad" configurations in a storage layer
|
||||
type Tracker interface {
|
||||
// Initialize sets up the storage layer
|
||||
Initialize() error
|
||||
// MarkBad marks `uid` as a bad config and records `reason` as the reason for marking it bad
|
||||
MarkBad(uid, reason string) error
|
||||
// Entry returns the Entry for `uid` if it exists in the tracker, otherise nil
|
||||
Entry(uid string) (*Entry, error)
|
||||
}
|
||||
|
||||
// Entry describes when a configuration was marked bad and why
|
||||
type Entry struct {
|
||||
Time string `json:"time"`
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
||||
// markBad makes an entry in `m` for the config with `uid` and reason `reason`
|
||||
func markBad(m map[string]Entry, uid, reason string) {
|
||||
now := time.Now()
|
||||
entry := Entry{
|
||||
Time: now.Format(time.RFC3339), // use RFC3339 time format
|
||||
Reason: reason,
|
||||
}
|
||||
m[uid] = entry
|
||||
}
|
||||
|
||||
// getEntry returns the Entry for `uid` in `m`, or nil if no such entry exists
|
||||
func getEntry(m map[string]Entry, uid string) *Entry {
|
||||
entry, ok := m[uid]
|
||||
if ok {
|
||||
return &entry
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// encode retuns a []byte representation of `m`, for saving `m` to a storage layer
|
||||
func encode(m map[string]Entry) ([]byte, error) {
|
||||
data, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// decode transforms a []byte into a `map[string]Entry`, or returns an error if it can't produce said map
|
||||
// if `data` is empty, returns an empty map
|
||||
func decode(data []byte) (map[string]Entry, error) {
|
||||
// create the map
|
||||
m := map[string]Entry{}
|
||||
// if the data is empty, just return the empty map
|
||||
if len(data) == 0 {
|
||||
return m, nil
|
||||
}
|
||||
// otherwise unmarshal the json
|
||||
if err := json.Unmarshal(data, &m); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal, error: %v", err)
|
||||
}
|
||||
return m, nil
|
||||
}
|
@ -1,157 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package badconfig
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMarkBad(t *testing.T) {
|
||||
// build a map with one entry
|
||||
m := map[string]Entry{}
|
||||
uid := "uid"
|
||||
reason := "reason"
|
||||
markBad(m, uid, reason)
|
||||
|
||||
// the entry should exist for uid
|
||||
entry, ok := m[uid]
|
||||
if !ok {
|
||||
t.Fatalf("expect entry for uid %q, but none exists", uid)
|
||||
}
|
||||
|
||||
// the entry's reason should match the reason it was marked bad with
|
||||
if entry.Reason != reason {
|
||||
t.Errorf("expect Entry.Reason %q, but got %q", reason, entry.Reason)
|
||||
}
|
||||
|
||||
// the entry's timestamp should be in RFC3339 format
|
||||
if err := assertRFC3339(entry.Time); err != nil {
|
||||
t.Errorf("expect Entry.Time to use RFC3339 format, but got %q, error: %v", entry.Time, err)
|
||||
}
|
||||
|
||||
// it should be the only entry in the map thus far
|
||||
if n := len(m); n != 1 {
|
||||
t.Errorf("expect one entry in the map, but got %d", n)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestGetEntry(t *testing.T) {
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
uid := "uid"
|
||||
expect := &Entry{
|
||||
Time: nowstamp,
|
||||
Reason: "reason",
|
||||
}
|
||||
m := map[string]Entry{uid: *expect}
|
||||
|
||||
// should return nil for entries that don't exist
|
||||
bogus := "bogus-uid"
|
||||
if e := getEntry(m, bogus); e != nil {
|
||||
t.Errorf("expect nil for entries that don't exist (uid: %q), but got %#v", bogus, e)
|
||||
}
|
||||
|
||||
// should return non-nil for entries that exist
|
||||
if e := getEntry(m, uid); e == nil {
|
||||
t.Errorf("expect non-nil for entries that exist (uid: %q), but got nil", uid)
|
||||
} else if !reflect.DeepEqual(expect, e) {
|
||||
// entry should match what we inserted for the given UID
|
||||
t.Errorf("expect entry for uid %q to match %#v, but got %#v", uid, expect, e)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode(t *testing.T) {
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
uid := "uid"
|
||||
expect := fmt.Sprintf(`{"%s":{"time":"%s","reason":"reason"}}`, uid, nowstamp)
|
||||
m := map[string]Entry{uid: {
|
||||
Time: nowstamp,
|
||||
Reason: "reason",
|
||||
}}
|
||||
|
||||
data, err := encode(m)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
json := string(data)
|
||||
|
||||
if json != expect {
|
||||
t.Errorf("expect encoding of %#v to match %q, but got %q", m, expect, json)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecode(t *testing.T) {
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
uid := "uid"
|
||||
valid := []byte(fmt.Sprintf(`{"%s":{"time":"%s","reason":"reason"}}`, uid, nowstamp))
|
||||
expect := map[string]Entry{uid: {
|
||||
Time: nowstamp,
|
||||
Reason: "reason",
|
||||
}}
|
||||
|
||||
// decoding valid json should result in an object with the correct values
|
||||
if m, err := decode(valid); err != nil {
|
||||
t.Errorf("expect decoding valid json %q to produce a map, but got error: %v", valid, err)
|
||||
} else if !reflect.DeepEqual(expect, m) {
|
||||
// m should equal expected decoded object
|
||||
t.Errorf("expect decoding valid json %q to produce %#v, but got %#v", valid, expect, m)
|
||||
}
|
||||
|
||||
// decoding invalid json should return an error
|
||||
invalid := []byte(`invalid`)
|
||||
if m, err := decode(invalid); err == nil {
|
||||
t.Errorf("expect decoding invalid json %q to return an error, but decoded to %#v", invalid, m)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoundTrip(t *testing.T) {
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
uid := "uid"
|
||||
expect := map[string]Entry{uid: {
|
||||
Time: nowstamp,
|
||||
Reason: "reason",
|
||||
}}
|
||||
|
||||
// test that encoding and decoding an object results in the same value
|
||||
data, err := encode(expect)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to encode %#v, error: %v", expect, err)
|
||||
}
|
||||
after, err := decode(data)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to decode %q, error: %v", string(data), err)
|
||||
}
|
||||
if !reflect.DeepEqual(expect, after) {
|
||||
t.Errorf("expect round-tripping %#v to result in the same value, but got %#v", expect, after)
|
||||
}
|
||||
}
|
||||
|
||||
func assertRFC3339(s string) error {
|
||||
tm, err := time.Parse(time.RFC3339, s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("expect RFC3339 format, but failed to parse, error: %v", err)
|
||||
}
|
||||
// parsing succeeded, now finish round-trip and compare
|
||||
rt := tm.Format(time.RFC3339)
|
||||
if rt != s {
|
||||
return fmt.Errorf("expect RFC3339 format, but failed to round trip unchanged, original %q, round-trip %q", s, rt)
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,105 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package badconfig
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
|
||||
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
|
||||
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
|
||||
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
|
||||
)
|
||||
|
||||
const (
|
||||
badConfigsFile = "bad-configs.json"
|
||||
)
|
||||
|
||||
// fsTracker tracks bad config in the local filesystem
|
||||
type fsTracker struct {
|
||||
// fs is the filesystem to use for storage operations; can be mocked for testing
|
||||
fs utilfs.Filesystem
|
||||
// trackingDir is the absolute path to the storage directory for fsTracker
|
||||
trackingDir string
|
||||
}
|
||||
|
||||
// NewFsTracker returns a new Tracker that will store information in the `trackingDir`
|
||||
func NewFsTracker(fs utilfs.Filesystem, trackingDir string) Tracker {
|
||||
return &fsTracker{
|
||||
fs: fs,
|
||||
trackingDir: trackingDir,
|
||||
}
|
||||
}
|
||||
|
||||
func (tracker *fsTracker) Initialize() error {
|
||||
utillog.Infof("initializing bad config tracking directory %q", tracker.trackingDir)
|
||||
if err := utilfiles.EnsureDir(tracker.fs, tracker.trackingDir); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := utilfiles.EnsureFile(tracker.fs, filepath.Join(tracker.trackingDir, badConfigsFile)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tracker *fsTracker) MarkBad(uid, reason string) error {
|
||||
m, err := tracker.load()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// create the bad config entry in the map
|
||||
markBad(m, uid, reason)
|
||||
// save the file
|
||||
if err := tracker.save(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tracker *fsTracker) Entry(uid string) (*Entry, error) {
|
||||
m, err := tracker.load()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// return the entry, or nil if it doesn't exist
|
||||
return getEntry(m, uid), nil
|
||||
}
|
||||
|
||||
// load loads the bad-config-tracking file from disk and decodes the map encoding it contains
|
||||
func (tracker *fsTracker) load() (map[string]Entry, error) {
|
||||
path := filepath.Join(tracker.trackingDir, badConfigsFile)
|
||||
// load the file
|
||||
data, err := tracker.fs.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return decode(data)
|
||||
}
|
||||
|
||||
// save replaces the contents of the bad-config-tracking file with the encoding of `m`
|
||||
func (tracker *fsTracker) save(m map[string]Entry) error {
|
||||
// encode the map
|
||||
data, err := encode(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// save the file
|
||||
path := filepath.Join(tracker.trackingDir, badConfigsFile)
|
||||
if err := utilfiles.ReplaceFile(tracker.fs, path, data); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,255 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package badconfig
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
|
||||
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
|
||||
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
|
||||
)
|
||||
|
||||
const testTrackingDir = "/test-tracking-dir"
|
||||
|
||||
// TODO(mtaufen): this file reuses a lot of test code from badconfig_test.go, should consolidate
|
||||
|
||||
func newInitializedFakeFsTracker() (*fsTracker, error) {
|
||||
fs := utilfs.NewFakeFs()
|
||||
tracker := NewFsTracker(fs, testTrackingDir)
|
||||
if err := tracker.Initialize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tracker.(*fsTracker), nil
|
||||
}
|
||||
|
||||
func TestFsTrackerInitialize(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("fsTracker.Initialize() failed with error: %v", err)
|
||||
}
|
||||
|
||||
// check that testTrackingDir exists
|
||||
_, err = tracker.fs.Stat(testTrackingDir)
|
||||
if err != nil {
|
||||
t.Fatalf("expect %q to exist, but stat failed with error: %v", testTrackingDir, err)
|
||||
}
|
||||
|
||||
// check that testTrackingDir contains the badConfigsFile
|
||||
path := filepath.Join(testTrackingDir, badConfigsFile)
|
||||
_, err = tracker.fs.Stat(path)
|
||||
if err != nil {
|
||||
t.Fatalf("expect %q to exist, but stat failed with error: %v", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFsTrackerMarkBad(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
// create a bad config entry in the fs
|
||||
uid := "uid"
|
||||
reason := "reason"
|
||||
tracker.MarkBad(uid, reason)
|
||||
|
||||
// load the map from the fs
|
||||
m, err := tracker.load()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load bad-config data, error: %v", err)
|
||||
}
|
||||
|
||||
// the entry should exist for uid
|
||||
entry, ok := m[uid]
|
||||
if !ok {
|
||||
t.Fatalf("expect entry for uid %q, but none exists", uid)
|
||||
}
|
||||
|
||||
// the entry's reason should match the reason it was marked bad with
|
||||
if entry.Reason != reason {
|
||||
t.Errorf("expect Entry.Reason %q, but got %q", reason, entry.Reason)
|
||||
}
|
||||
|
||||
// the entry's timestamp should be in RFC3339 format
|
||||
if err := assertRFC3339(entry.Time); err != nil {
|
||||
t.Errorf("expect Entry.Time to use RFC3339 format, but got %q, error: %v", entry.Time, err)
|
||||
}
|
||||
|
||||
// it should be the only entry in the map thus far
|
||||
if n := len(m); n != 1 {
|
||||
t.Errorf("expect one entry in the map, but got %d", n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFsTrackerEntry(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
// manually save a correct entry to fs
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
uid := "uid"
|
||||
expect := &Entry{
|
||||
Time: nowstamp,
|
||||
Reason: "reason",
|
||||
}
|
||||
m := map[string]Entry{uid: *expect}
|
||||
err = tracker.save(m)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to save bad-config data, error: %v", err)
|
||||
}
|
||||
|
||||
// should return nil for entries that don't exist
|
||||
bogus := "bogus-uid"
|
||||
e, err := tracker.Entry(bogus)
|
||||
if err != nil {
|
||||
t.Errorf("expect nil for entries that don't exist (uid: %q), but got error: %v", bogus, err)
|
||||
} else if e != nil {
|
||||
t.Errorf("expect nil for entries that don't exist (uid: %q), but got %#v", bogus, e)
|
||||
}
|
||||
|
||||
// should return non-nil for entries that exist
|
||||
e, err = tracker.Entry(uid)
|
||||
if err != nil {
|
||||
t.Errorf("expect non-nil for entries that exist (uid: %q), but got error: %v", uid, err)
|
||||
} else if e == nil {
|
||||
t.Errorf("expect non-nil for entries that exist (uid: %q), but got nil", uid)
|
||||
} else if !reflect.DeepEqual(expect, e) {
|
||||
// entry should match what we inserted for the given UID
|
||||
t.Errorf("expect entry for uid %q to match %#v, but got %#v", uid, expect, e)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(mtaufen): test loading invalid json (see startups/fstracker_test.go for example)
|
||||
func TestFsTrackerLoad(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
uid := "uid"
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
cases := []struct {
|
||||
desc string
|
||||
data []byte
|
||||
expect map[string]Entry
|
||||
err string
|
||||
}{
|
||||
// empty file
|
||||
{"empty file", []byte(""), map[string]Entry{}, ""},
|
||||
// empty map
|
||||
{"empty map", []byte("{}"), map[string]Entry{}, ""},
|
||||
// valid json
|
||||
{"valid json", []byte(fmt.Sprintf(`{"%s":{"time":"%s","reason":"reason"}}`, uid, nowstamp)),
|
||||
map[string]Entry{uid: {
|
||||
Time: nowstamp,
|
||||
Reason: "reason",
|
||||
}}, ""},
|
||||
// invalid json
|
||||
{"invalid json", []byte(`*`), map[string]Entry{}, "failed to unmarshal"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
// save a file containing the correct serialization
|
||||
utilfiles.ReplaceFile(tracker.fs, filepath.Join(testTrackingDir, badConfigsFile), c.data)
|
||||
|
||||
// loading valid json should result in an object with the correct values
|
||||
m, err := tracker.load()
|
||||
if utiltest.SkipRest(t, c.desc, err, c.err) {
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(c.expect, m) {
|
||||
// m should equal expected decoded object
|
||||
t.Errorf("case %q, expect %#v but got %#v", c.desc, c.expect, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFsTrackerSave(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
uid := "uid"
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
cases := []struct {
|
||||
desc string
|
||||
m map[string]Entry
|
||||
expect string
|
||||
err string
|
||||
}{
|
||||
// empty map
|
||||
{"empty map", map[string]Entry{}, "{}", ""},
|
||||
// 1-entry map
|
||||
{"1-entry map",
|
||||
map[string]Entry{uid: {
|
||||
Time: nowstamp,
|
||||
Reason: "reason",
|
||||
}},
|
||||
fmt.Sprintf(`{"%s":{"time":"%s","reason":"reason"}}`, uid, nowstamp), ""},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
if err := tracker.save(c.m); utiltest.SkipRest(t, c.desc, err, c.err) {
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := tracker.fs.ReadFile(filepath.Join(testTrackingDir, badConfigsFile))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read bad-config file, error: %v", err)
|
||||
}
|
||||
json := string(data)
|
||||
|
||||
if json != c.expect {
|
||||
t.Errorf("case %q, expect %q but got %q", c.desc, c.expect, json)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFsTrackerRoundTrip(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
uid := "uid"
|
||||
expect := map[string]Entry{uid: {
|
||||
Time: nowstamp,
|
||||
Reason: "reason",
|
||||
}}
|
||||
|
||||
// test that saving and loading an object results in the same value
|
||||
err = tracker.save(expect)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to save bad-config data, error: %v", err)
|
||||
}
|
||||
after, err := tracker.load()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load bad-config data, error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(expect, after) {
|
||||
t.Errorf("expect round-tripping %#v to result in the same value, but got %#v", expect, after)
|
||||
}
|
||||
}
|
@ -75,32 +75,8 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, nodeName stri
|
||||
// If we get here:
|
||||
// - there is no need to restart to update the current config
|
||||
// - there was no error trying to sync configuration
|
||||
// - if, previously, there was an error trying to sync configuration, we need to update to the correct condition
|
||||
errfmt := `sync succeeded but unable to clear "failed to sync" message from ConfigOK, error: %v`
|
||||
|
||||
currentUID := ""
|
||||
if currentSource, err := cc.checkpointStore.Current(); err != nil {
|
||||
utillog.Errorf(errfmt, err)
|
||||
return
|
||||
} else if currentSource != nil {
|
||||
currentUID = currentSource.UID()
|
||||
}
|
||||
|
||||
lkgUID := ""
|
||||
if lkgSource, err := cc.checkpointStore.LastKnownGood(); err != nil {
|
||||
utillog.Errorf(errfmt, err)
|
||||
return
|
||||
} else if lkgSource != nil {
|
||||
lkgUID = lkgSource.UID()
|
||||
}
|
||||
|
||||
currentBadReason := ""
|
||||
if entry, err := cc.badConfigTracker.Entry(currentUID); err != nil {
|
||||
utillog.Errorf(errfmt, err)
|
||||
} else if entry != nil {
|
||||
currentBadReason = entry.Reason
|
||||
}
|
||||
cc.configOK.ClearFailedSyncCondition(currentUID, lkgUID, currentBadReason, cc.initConfig != nil)
|
||||
// - if, previously, there was an error trying to sync configuration, we need to clear that error from the condition
|
||||
cc.configOK.ClearFailedSyncCondition()
|
||||
}
|
||||
|
||||
// doSyncConfigSource checkpoints and sets the store's current config to the new config or resets config,
|
||||
|
@ -27,12 +27,9 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/validation"
|
||||
"k8s.io/kubernetes/pkg/version"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/badconfig"
|
||||
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
|
||||
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
|
||||
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/startups"
|
||||
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
|
||||
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
|
||||
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
|
||||
@ -40,10 +37,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
badConfigTrackingDir = "bad-config-tracking"
|
||||
startupTrackingDir = "startup-tracking"
|
||||
checkpointsDir = "checkpoints"
|
||||
initConfigDir = "init"
|
||||
checkpointsDir = "checkpoints"
|
||||
initConfigDir = "init"
|
||||
)
|
||||
|
||||
// Controller is the controller which, among other things:
|
||||
@ -51,7 +46,6 @@ const (
|
||||
// - checkpoints configuration to disk
|
||||
// - downloads new configuration from the API server
|
||||
// - validates configuration
|
||||
// - monitors for potential crash-loops caused by new configurations
|
||||
// - tracks the last-known-good configuration, and rolls-back to last-known-good when necessary
|
||||
// For more information, see the proposal: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/dynamic-kubelet-configuration.md
|
||||
type Controller struct {
|
||||
@ -78,12 +72,6 @@ type Controller struct {
|
||||
|
||||
// checkpointStore persists config source checkpoints to a storage layer
|
||||
checkpointStore store.Store
|
||||
|
||||
// badConfigTracker persists bad-config records to a storage layer
|
||||
badConfigTracker badconfig.Tracker
|
||||
|
||||
// startupTracker persists Kubelet startup records, used for crash-loop detection, to a storage layer
|
||||
startupTracker startups.Tracker
|
||||
}
|
||||
|
||||
// NewController constructs a new Controller object and returns it. Directory paths must be absolute.
|
||||
@ -109,14 +97,6 @@ func NewController(initConfigDir string,
|
||||
dynamicConfig = true
|
||||
}
|
||||
|
||||
// Get the current kubelet version; bad-config and startup-tracking information can be kubelet-version specific,
|
||||
// e.g. a bug that crash loops an old Kubelet under a given config might be fixed in a new Kubelet or vice-versa,
|
||||
// validation might be relaxed in a new Kubelet, etc.
|
||||
// We also don't want a change in a file format to break Kubelet upgrades; this makes sure a new kubelet gets
|
||||
// a fresh dir to put its config health data in.
|
||||
// Note that config checkpoints use the api machinery to store ConfigMaps, and thus get file format versioning for free.
|
||||
kubeletVersion := version.Get().String()
|
||||
|
||||
return &Controller{
|
||||
dynamicConfig: dynamicConfig,
|
||||
defaultConfig: defaultConfig,
|
||||
@ -124,8 +104,6 @@ func NewController(initConfigDir string,
|
||||
pendingConfigSource: make(chan bool, 1),
|
||||
configOK: status.NewConfigOKCondition(),
|
||||
checkpointStore: store.NewFsStore(fs, filepath.Join(dynamicConfigDir, checkpointsDir)),
|
||||
badConfigTracker: badconfig.NewFsTracker(fs, filepath.Join(dynamicConfigDir, badConfigTrackingDir, kubeletVersion)),
|
||||
startupTracker: startups.NewFsTracker(fs, filepath.Join(dynamicConfigDir, startupTrackingDir, kubeletVersion)),
|
||||
initLoader: initLoader,
|
||||
}, nil
|
||||
}
|
||||
@ -170,11 +148,6 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// record the kubelet startup time, used for crashloop detection
|
||||
if err := cc.startupTracker.RecordStartup(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// determine UID of the current config source
|
||||
curUID := ""
|
||||
if curSource, err := cc.checkpointStore.Current(); err != nil {
|
||||
@ -188,14 +161,6 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
|
||||
return cc.localConfig(), nil
|
||||
} // Assert: we will not use the local configurations, unless we roll back to lkg; curUID is non-empty
|
||||
|
||||
// check whether the current config is marked bad
|
||||
if entry, err := cc.badConfigTracker.Entry(curUID); err != nil {
|
||||
return nil, err
|
||||
} else if entry != nil {
|
||||
utillog.Infof("current config %q was marked bad for reason %q at time %q", curUID, entry.Reason, entry.Time)
|
||||
return cc.lkgRollback(entry.Reason)
|
||||
}
|
||||
|
||||
// TODO(mtaufen): consider re-verifying integrity and re-attempting download when a load/verify/parse/validate
|
||||
// error happens outside trial period, we already made it past the trial so it's probably filesystem corruption
|
||||
// or something else scary (unless someone is using a 0-length trial period)
|
||||
@ -203,33 +168,26 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
|
||||
// load the current config
|
||||
checkpoint, err := cc.checkpointStore.Load(curUID)
|
||||
if err != nil {
|
||||
// TODO(mtaufen): rollback and mark bad for now, but this could reasonably be handled by re-attempting a download,
|
||||
// TODO(mtaufen): rollback for now, but this could reasonably be handled by re-attempting a download,
|
||||
// it probably indicates some sort of corruption
|
||||
return cc.badRollback(curUID, fmt.Sprintf(status.CurFailLoadReasonFmt, curUID), fmt.Sprintf("error: %v", err))
|
||||
return cc.lkgRollback(fmt.Sprintf(status.CurFailLoadReasonFmt, curUID), fmt.Sprintf("error: %v", err))
|
||||
}
|
||||
|
||||
// parse the checkpoint into a KubeletConfiguration
|
||||
cur, err := checkpoint.Parse()
|
||||
if err != nil {
|
||||
return cc.badRollback(curUID, fmt.Sprintf(status.CurFailParseReasonFmt, curUID), fmt.Sprintf("error: %v", err))
|
||||
return cc.lkgRollback(fmt.Sprintf(status.CurFailParseReasonFmt, curUID), fmt.Sprintf("error: %v", err))
|
||||
}
|
||||
|
||||
// validate current config
|
||||
if err := validation.ValidateKubeletConfiguration(cur); err != nil {
|
||||
return cc.badRollback(curUID, fmt.Sprintf(status.CurFailValidateReasonFmt, curUID), fmt.Sprintf("error: %v", err))
|
||||
return cc.lkgRollback(fmt.Sprintf(status.CurFailValidateReasonFmt, curUID), fmt.Sprintf("error: %v", err))
|
||||
}
|
||||
|
||||
// check for crash loops if we're still in the trial period
|
||||
// when the trial period is over, the current config becomes the last-known-good
|
||||
if trial, err := cc.inTrial(cur.ConfigTrialDuration.Duration); err != nil {
|
||||
return nil, err
|
||||
} else if trial {
|
||||
if crashing, err := cc.crashLooping(cur.CrashLoopThreshold); err != nil {
|
||||
return nil, err
|
||||
} else if crashing {
|
||||
return cc.badRollback(curUID, fmt.Sprintf(status.CurFailCrashLoopReasonFmt, curUID), "")
|
||||
}
|
||||
} else {
|
||||
// when the trial period is over, the current config becomes the last-known-good
|
||||
} else if !trial {
|
||||
if err := cc.graduateCurrentToLastKnownGood(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -293,14 +251,6 @@ func (cc *Controller) initialize() error {
|
||||
if err := cc.checkpointStore.Initialize(); err != nil {
|
||||
return err
|
||||
}
|
||||
// initialize bad config tracker
|
||||
if err := cc.badConfigTracker.Initialize(); err != nil {
|
||||
return err
|
||||
}
|
||||
// initialize startup tracker
|
||||
if err := cc.startupTracker.Initialize(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -327,21 +277,6 @@ func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// crashLooping returns true if the number of startups since the last modification of the current config exceeds `threshold`, false otherwise
|
||||
func (cc *Controller) crashLooping(threshold int32) (bool, error) {
|
||||
// determine the last time the current config changed
|
||||
modTime, err := cc.checkpointStore.CurrentModified()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// get the number of startups since that modification time
|
||||
num, err := cc.startupTracker.StartupsSince(modTime)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return num > threshold, nil
|
||||
}
|
||||
|
||||
// graduateCurrentToLastKnownGood sets the last-known-good UID on the checkpointStore
|
||||
// to the same value as the current UID maintained by the checkpointStore
|
||||
func (cc *Controller) graduateCurrentToLastKnownGood() error {
|
||||
|
@ -26,19 +26,10 @@ import (
|
||||
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
|
||||
)
|
||||
|
||||
// badRollback makes an entry in the bad-config-tracking file for `uid` with `reason`, and returns the result of rolling back to the last-known-good config
|
||||
func (cc *Controller) badRollback(uid, reason, detail string) (*kubeletconfig.KubeletConfiguration, error) {
|
||||
utillog.Errorf(fmt.Sprintf("%s, %s", reason, detail))
|
||||
if err := cc.badConfigTracker.MarkBad(uid, reason); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cc.lkgRollback(reason)
|
||||
}
|
||||
|
||||
// lkgRollback returns a valid last-known-good configuration, and updates the `cc.configOK` condition
|
||||
// regarding the `reason` for the rollback, or returns an error if a valid last-known-good could not be produced
|
||||
func (cc *Controller) lkgRollback(reason string) (*kubeletconfig.KubeletConfiguration, error) {
|
||||
utillog.Infof("rolling back to last-known-good config")
|
||||
func (cc *Controller) lkgRollback(reason, detail string) (*kubeletconfig.KubeletConfiguration, error) {
|
||||
utillog.Errorf(fmt.Sprintf("%s, %s", reason, detail))
|
||||
|
||||
lkgUID := ""
|
||||
if lkgSource, err := cc.checkpointStore.LastKnownGood(); err != nil {
|
||||
|
@ -1,48 +0,0 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"fstracker.go",
|
||||
"startups.go",
|
||||
],
|
||||
deps = [
|
||||
"//pkg/kubelet/apis/kubeletconfig/validation:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"fstracker_test.go",
|
||||
"startups_test.go",
|
||||
],
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
|
||||
],
|
||||
)
|
@ -1,127 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package startups
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
|
||||
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
|
||||
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
|
||||
)
|
||||
|
||||
const (
|
||||
startupsFile = "startups.json"
|
||||
)
|
||||
|
||||
// fsTracker tracks startups in the local filesystem
|
||||
type fsTracker struct {
|
||||
// fs is the filesystem to use for storage operations; can be mocked for testing
|
||||
fs utilfs.Filesystem
|
||||
// trackingDir is the absolute path to the storage directory for fsTracker
|
||||
trackingDir string
|
||||
}
|
||||
|
||||
// NewFsTracker returns a Tracker that will store information in the `trackingDir`
|
||||
func NewFsTracker(fs utilfs.Filesystem, trackingDir string) Tracker {
|
||||
return &fsTracker{
|
||||
fs: fs,
|
||||
trackingDir: trackingDir,
|
||||
}
|
||||
}
|
||||
|
||||
func (tracker *fsTracker) Initialize() error {
|
||||
utillog.Infof("initializing startups tracking directory %q", tracker.trackingDir)
|
||||
if err := utilfiles.EnsureDir(tracker.fs, tracker.trackingDir); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := utilfiles.EnsureFile(tracker.fs, filepath.Join(tracker.trackingDir, startupsFile)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tracker *fsTracker) RecordStartup() error {
|
||||
// load the file
|
||||
ls, err := tracker.load()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ls = recordStartup(ls)
|
||||
|
||||
// save the file
|
||||
err = tracker.save(ls)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tracker *fsTracker) StartupsSince(t time.Time) (int32, error) {
|
||||
// load the startups-tracking file
|
||||
ls, err := tracker.load()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return startupsSince(ls, t)
|
||||
}
|
||||
|
||||
// TODO(mtaufen): refactor into encode/decode like in badconfig.go
|
||||
|
||||
// load loads the startups-tracking file from disk
|
||||
func (tracker *fsTracker) load() ([]string, error) {
|
||||
path := filepath.Join(tracker.trackingDir, startupsFile)
|
||||
|
||||
// load the file
|
||||
b, err := tracker.fs.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load startups-tracking file %q, error: %v", path, err)
|
||||
}
|
||||
|
||||
// parse json into the slice
|
||||
ls := []string{}
|
||||
|
||||
// if the file is empty, just return empty slice
|
||||
if len(b) == 0 {
|
||||
return ls, nil
|
||||
}
|
||||
|
||||
// otherwise unmarshal the json
|
||||
if err := json.Unmarshal(b, &ls); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal json from startups-tracking file %q, error: %v", path, err)
|
||||
}
|
||||
return ls, nil
|
||||
}
|
||||
|
||||
// save replaces the contents of the startups-tracking file with `ls`
|
||||
func (tracker *fsTracker) save(ls []string) error {
|
||||
// marshal the json
|
||||
b, err := json.Marshal(ls)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// save the file
|
||||
path := filepath.Join(tracker.trackingDir, startupsFile)
|
||||
if err := utilfiles.ReplaceFile(tracker.fs, path, b); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,294 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package startups
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
|
||||
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
|
||||
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
|
||||
)
|
||||
|
||||
const testTrackingDir = "/test-tracking-dir"
|
||||
|
||||
// TODO(mtaufen): this file reuses a lot of test code from startups_test.go, should consolidate
|
||||
|
||||
func newInitializedFakeFsTracker() (*fsTracker, error) {
|
||||
fs := utilfs.NewFakeFs()
|
||||
tracker := NewFsTracker(fs, testTrackingDir)
|
||||
if err := tracker.Initialize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tracker.(*fsTracker), nil
|
||||
}
|
||||
|
||||
func TestFsTrackerInitialize(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("tracker.Initialize() failed with error: %v", err)
|
||||
}
|
||||
|
||||
// check that testTrackingDir exists
|
||||
_, err = tracker.fs.Stat(testTrackingDir)
|
||||
if err != nil {
|
||||
t.Fatalf("expect %q to exist, but stat failed with error: %v", testTrackingDir, err)
|
||||
}
|
||||
|
||||
// check that testTrackingDir contains the startupsFile
|
||||
path := filepath.Join(testTrackingDir, startupsFile)
|
||||
_, err = tracker.fs.Stat(path)
|
||||
if err != nil {
|
||||
t.Fatalf("expect %q to exist, but stat failed with error: %v", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFsTrackerRecordStartup(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
fullList := func() []string {
|
||||
ls := []string{}
|
||||
for i := maxStartups; i > 0; i-- {
|
||||
// subtract decreasing amounts so timestamps increase but remain in the past
|
||||
ls = append(ls, now.Add(-time.Duration(i)*time.Second).Format(time.RFC3339))
|
||||
}
|
||||
return ls
|
||||
}()
|
||||
cases := []struct {
|
||||
desc string
|
||||
ls []string
|
||||
expectHead []string // what we expect the first length-1 elements to look like after recording a new timestamp
|
||||
expectLen int // how long the list should be after recording
|
||||
}{
|
||||
// start empty
|
||||
{
|
||||
"start empty",
|
||||
[]string{},
|
||||
[]string{},
|
||||
1,
|
||||
},
|
||||
// start non-empty
|
||||
{
|
||||
"start non-empty",
|
||||
// subtract 1 so stamps are in the past
|
||||
[]string{now.Add(-1 * time.Second).Format(time.RFC3339)},
|
||||
[]string{now.Add(-1 * time.Second).Format(time.RFC3339)},
|
||||
2,
|
||||
},
|
||||
// rotate list
|
||||
{
|
||||
"rotate list",
|
||||
// make a slice with len == maxStartups, containing monotonically-increasing timestamps
|
||||
fullList,
|
||||
fullList[1:],
|
||||
maxStartups,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
// save the starting point, record a "startup" time, then load list from fs
|
||||
if err := tracker.save(c.ls); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if err := tracker.RecordStartup(); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
ls, err := tracker.load()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if c.expectLen != len(ls) {
|
||||
t.Errorf("case %q, expected list %q to have length %d", c.desc, ls, c.expectLen)
|
||||
}
|
||||
if !reflect.DeepEqual(c.expectHead, ls[:len(ls)-1]) {
|
||||
t.Errorf("case %q, expected elements 0 through n-1 of list %q to equal %q", c.desc, ls, c.expectHead)
|
||||
}
|
||||
// timestamps should be monotonically increasing (assuming system clock isn't jumping around at least)
|
||||
if sorted, err := timestampsSorted(ls); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if !sorted {
|
||||
t.Errorf("case %q, expected monotonically increasing timestamps, but got %q", c.desc, ls)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFsTrackerStartupsSince(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
now, err := time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
ls []string
|
||||
expect int32
|
||||
err string
|
||||
}{
|
||||
// empty list
|
||||
{"empty list", []string{}, 0, ""},
|
||||
// no startups since
|
||||
{
|
||||
"no startups since",
|
||||
[]string{"2014-01-02T15:04:05Z", "2015-01-02T15:04:05Z", "2016-01-02T15:04:05Z"},
|
||||
0,
|
||||
"",
|
||||
},
|
||||
// 2 startups since
|
||||
{
|
||||
"some startups since",
|
||||
[]string{"2016-01-02T15:04:05Z", "2018-01-02T15:04:05Z", "2019-01-02T15:04:05Z"},
|
||||
2,
|
||||
"",
|
||||
},
|
||||
// all startups since
|
||||
{
|
||||
"all startups since",
|
||||
[]string{"2018-01-02T15:04:05Z", "2019-01-02T15:04:05Z", "2020-01-02T15:04:05Z"},
|
||||
3,
|
||||
"",
|
||||
},
|
||||
// invalid timestamp
|
||||
{"invalid timestamp", []string{"2018-01-02T15:04:05Z08:00"}, 0, "failed to parse"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
if err := tracker.save(c.ls); err != nil {
|
||||
t.Fatalf("unexected error: %v", err)
|
||||
}
|
||||
num, err := tracker.StartupsSince(now)
|
||||
if utiltest.SkipRest(t, c.desc, err, c.err) {
|
||||
continue
|
||||
}
|
||||
if num != c.expect {
|
||||
t.Errorf("case %q, expect %d startups but got %d", c.desc, c.expect, num)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFsTrackerLoad(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
cases := []struct {
|
||||
desc string
|
||||
data []byte
|
||||
expect []string
|
||||
err string
|
||||
}{
|
||||
// empty file
|
||||
{"empty file", []byte(""), []string{}, ""},
|
||||
// empty list
|
||||
{"empty list", []byte("[]"), []string{}, ""},
|
||||
// valid json
|
||||
{"valid json", []byte(fmt.Sprintf(`["%s"]`, nowstamp)), []string{nowstamp}, ""},
|
||||
// invalid json
|
||||
{"invalid json", []byte(`*`), []string{}, "failed to unmarshal"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
// save a file containing the correct serialization
|
||||
utilfiles.ReplaceFile(tracker.fs, filepath.Join(testTrackingDir, startupsFile), c.data)
|
||||
|
||||
// loading valid json should result in an object with the correct serialization
|
||||
ls, err := tracker.load()
|
||||
if utiltest.SkipRest(t, c.desc, err, c.err) {
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(c.expect, ls) {
|
||||
// ls should equal expected decoded object
|
||||
t.Errorf("case %q, expect %#v but got %#v", c.desc, c.expect, ls)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestFsTrackerSave(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
cases := []struct {
|
||||
desc string
|
||||
ls []string
|
||||
expect string
|
||||
err string
|
||||
}{
|
||||
// empty list
|
||||
{"empty list", []string{}, "[]", ""},
|
||||
// 1-entry list
|
||||
{"valid json", []string{nowstamp}, fmt.Sprintf(`["%s"]`, nowstamp), ""},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
if err := tracker.save(c.ls); utiltest.SkipRest(t, c.desc, err, c.err) {
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := tracker.fs.ReadFile(filepath.Join(testTrackingDir, startupsFile))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read startups file, error: %v", err)
|
||||
}
|
||||
json := string(data)
|
||||
|
||||
if json != c.expect {
|
||||
t.Errorf("case %q, expect %q but got %q", c.desc, c.expect, json)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFsTrackerRoundTrip(t *testing.T) {
|
||||
tracker, err := newInitializedFakeFsTracker()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to construct a tracker, error: %v", err)
|
||||
}
|
||||
|
||||
nowstamp := time.Now().Format(time.RFC3339)
|
||||
expect := []string{nowstamp}
|
||||
|
||||
// test that saving and loading an object results in the same value
|
||||
err = tracker.save(expect)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to save startups data, error: %v", err)
|
||||
}
|
||||
after, err := tracker.load()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load startups data, error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(expect, after) {
|
||||
t.Errorf("expect round-tripping %#v to result in the same value, but got %#v", expect, after)
|
||||
}
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package startups
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/validation"
|
||||
)
|
||||
|
||||
const (
|
||||
// we allow one extra startup to account for the startup necessary to update configuration
|
||||
maxStartups = validation.MaxCrashLoopThreshold + 1
|
||||
)
|
||||
|
||||
// Tracker tracks Kubelet startups in a storage layer
|
||||
type Tracker interface {
|
||||
// Initialize sets up the storage layer
|
||||
Initialize() error
|
||||
// RecordStartup records the current time as a Kubelet startup
|
||||
RecordStartup() error
|
||||
// StartupsSince returns the number of Kubelet startus recorded since `t`
|
||||
StartupsSince(t time.Time) (int32, error)
|
||||
}
|
||||
|
||||
func startupsSince(ls []string, start time.Time) (int32, error) {
|
||||
// since the list is append-only we only need to count the number of timestamps since `t`
|
||||
startups := int32(0)
|
||||
for _, stamp := range ls {
|
||||
t, err := time.Parse(time.RFC3339, stamp)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to parse timestamp while counting startups, error: %v", err)
|
||||
}
|
||||
if t.After(start) {
|
||||
startups++
|
||||
}
|
||||
}
|
||||
return startups, nil
|
||||
}
|
||||
|
||||
func recordStartup(ls []string) []string {
|
||||
// record current time
|
||||
now := time.Now()
|
||||
stamp := now.Format(time.RFC3339) // use RFC3339 time format
|
||||
ls = append(ls, stamp)
|
||||
|
||||
// rotate the slice if necessary
|
||||
if len(ls) > maxStartups {
|
||||
ls = ls[1:]
|
||||
}
|
||||
|
||||
// return the new slice
|
||||
return ls
|
||||
}
|
@ -1,157 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package startups
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
|
||||
)
|
||||
|
||||
func TestRecordStartup(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
fullList := func() []string {
|
||||
ls := []string{}
|
||||
for i := maxStartups; i > 0; i-- {
|
||||
// subtract decreasing amounts so timestamps increase but remain in the past
|
||||
ls = append(ls, now.Add(-time.Duration(i)*time.Second).Format(time.RFC3339))
|
||||
}
|
||||
return ls
|
||||
}()
|
||||
cases := []struct {
|
||||
desc string
|
||||
ls []string
|
||||
expectHead []string // what we expect the first length-1 elements to look like after recording a new timestamp
|
||||
expectLen int // how long the list should be after recording
|
||||
}{
|
||||
// start empty
|
||||
{
|
||||
"start empty",
|
||||
[]string{},
|
||||
[]string{},
|
||||
1,
|
||||
},
|
||||
// start non-empty
|
||||
{
|
||||
"start non-empty",
|
||||
// subtract 1 so stamps are in the past
|
||||
[]string{now.Add(-1 * time.Second).Format(time.RFC3339)},
|
||||
[]string{now.Add(-1 * time.Second).Format(time.RFC3339)},
|
||||
2,
|
||||
},
|
||||
// rotate list
|
||||
{
|
||||
"rotate list",
|
||||
// make a slice with len == maxStartups, containing monotonically-increasing timestamps
|
||||
fullList,
|
||||
fullList[1:],
|
||||
maxStartups,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
ls := recordStartup(c.ls)
|
||||
if c.expectLen != len(ls) {
|
||||
t.Errorf("case %q, expected list %q to have length %d", c.desc, ls, c.expectLen)
|
||||
}
|
||||
if !reflect.DeepEqual(c.expectHead, ls[:len(ls)-1]) {
|
||||
t.Errorf("case %q, expected elements 0 through n-1 of list %q to equal %q", c.desc, ls, c.expectHead)
|
||||
}
|
||||
// timestamps should be monotonically increasing (assuming system clock isn't jumping around at least)
|
||||
if sorted, err := timestampsSorted(ls); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if !sorted {
|
||||
t.Errorf("case %q, expected monotonically increasing timestamps, but got %q", c.desc, ls)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartupsSince(t *testing.T) {
|
||||
now, err := time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
ls []string
|
||||
expect int32
|
||||
err string
|
||||
}{
|
||||
// empty list
|
||||
{"empty list", []string{}, 0, ""},
|
||||
// no startups since
|
||||
{
|
||||
"no startups since",
|
||||
[]string{"2014-01-02T15:04:05Z", "2015-01-02T15:04:05Z", "2016-01-02T15:04:05Z"},
|
||||
0,
|
||||
"",
|
||||
},
|
||||
// 2 startups since
|
||||
{
|
||||
"some startups since",
|
||||
[]string{"2016-01-02T15:04:05Z", "2018-01-02T15:04:05Z", "2019-01-02T15:04:05Z"},
|
||||
2,
|
||||
"",
|
||||
},
|
||||
// all startups since
|
||||
{
|
||||
"all startups since",
|
||||
[]string{"2018-01-02T15:04:05Z", "2019-01-02T15:04:05Z", "2020-01-02T15:04:05Z"},
|
||||
3,
|
||||
"",
|
||||
},
|
||||
// invalid timestamp
|
||||
{"invalid timestamp", []string{"2018-01-02T15:04:05Z08:00"}, 0, "failed to parse"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
num, err := startupsSince(c.ls, now)
|
||||
if utiltest.SkipRest(t, c.desc, err, c.err) {
|
||||
continue
|
||||
}
|
||||
if num != c.expect {
|
||||
t.Errorf("case %q, expect %d startups but got %d", c.desc, c.expect, num)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// returns true if the timestamps are monotically increasing, false otherwise
|
||||
func timestampsSorted(ls []string) (bool, error) {
|
||||
if len(ls) < 2 {
|
||||
return true, nil
|
||||
}
|
||||
prev, err := time.Parse(time.RFC3339, ls[0])
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, stamp := range ls[1:] {
|
||||
cur, err := time.Parse(time.RFC3339, stamp)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !cur.After(prev) {
|
||||
return false, nil
|
||||
}
|
||||
prev = cur
|
||||
}
|
||||
return true, nil
|
||||
}
|
@ -18,7 +18,6 @@ package status
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -83,8 +82,8 @@ type ConfigOKCondition interface {
|
||||
Set(message, reason string, status apiv1.ConditionStatus)
|
||||
// SetFailedSyncCondition sets the condition for when syncing Kubelet config fails
|
||||
SetFailedSyncCondition(reason string)
|
||||
// ClearFailedSyncCondition resets ConfigOKCondition to the correct condition for successfully syncing the kubelet config
|
||||
ClearFailedSyncCondition(current string, lastKnownGood string, currentBadReason string, initConfig bool)
|
||||
// ClearFailedSyncCondition clears the overlay from SetFailedSyncCondition
|
||||
ClearFailedSyncCondition()
|
||||
// Sync patches the current condition into the Node identified by `nodeName`
|
||||
Sync(client clientset.Interface, nodeName string)
|
||||
}
|
||||
@ -95,6 +94,8 @@ type configOKCondition struct {
|
||||
conditionMux sync.Mutex
|
||||
// condition is the current ConfigOK node condition, which will be reported in the Node.status.conditions
|
||||
condition *apiv1.NodeCondition
|
||||
// failedSyncReason is sent in place of the usual reason when the Kubelet is failing to sync the remote config
|
||||
failedSyncReason string
|
||||
// pendingCondition; write to this channel to indicate that ConfigOK needs to be synced to the API server
|
||||
pendingCondition chan bool
|
||||
}
|
||||
@ -142,44 +143,20 @@ func (c *configOKCondition) Set(message, reason string, status apiv1.ConditionSt
|
||||
// SetFailedSyncCondition updates the ConfigOK status to reflect that we failed to sync to the latest config because we couldn't figure out what
|
||||
// config to use (e.g. due to a malformed reference, a download failure, etc)
|
||||
func (c *configOKCondition) SetFailedSyncCondition(reason string) {
|
||||
c.Set(c.condition.Message, fmt.Sprintf("failed to sync, desired config unclear, reason: %s", reason), apiv1.ConditionUnknown)
|
||||
}
|
||||
|
||||
// ClearFailedSyncCondition resets ConfigOK to the correct condition for the config UIDs
|
||||
// `current` and `lastKnownGood`, depending on whether current is bad (non-empty `currentBadReason`)
|
||||
// and whether an init config exists (`initConfig` is true).
|
||||
func (c *configOKCondition) ClearFailedSyncCondition(current string,
|
||||
lastKnownGood string,
|
||||
currentBadReason string,
|
||||
initConfig bool) {
|
||||
// since our reason-check relies on c.condition we must manually take the lock and use c.unsafeSet instead of c.Set
|
||||
c.conditionMux.Lock()
|
||||
defer c.conditionMux.Unlock()
|
||||
if strings.Contains(c.condition.Reason, "failed to sync, desired config unclear") {
|
||||
// if we should report a "current is bad, rolled back" state
|
||||
if len(currentBadReason) > 0 {
|
||||
if len(current) == 0 {
|
||||
if initConfig {
|
||||
c.unsafeSet(LkgInitMessage, currentBadReason, apiv1.ConditionFalse)
|
||||
return
|
||||
}
|
||||
c.unsafeSet(LkgDefaultMessage, currentBadReason, apiv1.ConditionFalse)
|
||||
return
|
||||
}
|
||||
c.unsafeSet(fmt.Sprintf(LkgRemoteMessageFmt, lastKnownGood), currentBadReason, apiv1.ConditionFalse)
|
||||
return
|
||||
}
|
||||
// if we should report a "current is ok" state
|
||||
if len(current) == 0 {
|
||||
if initConfig {
|
||||
c.unsafeSet(CurInitMessage, CurInitOKReason, apiv1.ConditionTrue)
|
||||
return
|
||||
}
|
||||
c.unsafeSet(CurDefaultMessage, CurDefaultOKReason, apiv1.ConditionTrue)
|
||||
return
|
||||
}
|
||||
c.unsafeSet(fmt.Sprintf(CurRemoteMessageFmt, current), CurRemoteOKReason, apiv1.ConditionTrue)
|
||||
}
|
||||
// set the reason overlay and poke the sync worker to send the update
|
||||
c.failedSyncReason = fmt.Sprintf("failed to sync, desired config unclear, reason: %s", reason)
|
||||
c.pokeSyncWorker()
|
||||
}
|
||||
|
||||
// ClearFailedSyncCondition removes the "failed to sync" reason overlay
|
||||
func (c *configOKCondition) ClearFailedSyncCondition() {
|
||||
c.conditionMux.Lock()
|
||||
defer c.conditionMux.Unlock()
|
||||
// clear the reason overlay and poke the sync worker to send the update
|
||||
c.failedSyncReason = ""
|
||||
c.pokeSyncWorker()
|
||||
}
|
||||
|
||||
// pokeSyncWorker notes that the ConfigOK condition needs to be synced to the API server
|
||||
@ -239,6 +216,16 @@ func (c *configOKCondition) Sync(client clientset.Interface, nodeName string) {
|
||||
c.condition.LastTransitionTime = remote.LastTransitionTime
|
||||
}
|
||||
|
||||
// overlay the failedSyncReason if necessary
|
||||
var condition *apiv1.NodeCondition
|
||||
if len(c.failedSyncReason) > 0 {
|
||||
// get a copy of the condition before we edit it
|
||||
condition = c.condition.DeepCopy()
|
||||
condition.Reason = c.failedSyncReason
|
||||
} else {
|
||||
condition = c.condition
|
||||
}
|
||||
|
||||
// generate the patch
|
||||
mediaType := "application/json"
|
||||
info, ok := kuberuntime.SerializerInfoForMediaType(api.Codecs.SupportedMediaTypes(), mediaType)
|
||||
|
Loading…
Reference in New Issue
Block a user