Merge pull request #57221 from mtaufen/kc-event

Automatic merge from submit-queue (batch tested with PRs 57434, 57221, 57417, 57474, 57481). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Send an event just before the Kubelet restarts to use a new config

**What this PR does / why we need it**:
This PR makes the Kubelet send events for configuration changes. This makes it much easier to see a recent history of configuration changes for the Kubelet. 

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #56895

**Special notes for your reviewer**:

**Release note**:
```release-note
NONE
```

/cc @dchen1107 @liggitt @dashpole
This commit is contained in:
Kubernetes Submit Queue
2017-12-20 17:42:37 -08:00
committed by GitHub
8 changed files with 224 additions and 49 deletions

View File

@@ -25,13 +25,16 @@ go_library(
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/kubelet/kubeletconfig/util/panic:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@@ -34,6 +34,8 @@ import (
type RemoteConfigSource interface {
// UID returns the UID of the remote config source object
UID() string
// APIPath returns the API path to the remote resource, e.g. its SelfLink
APIPath() string
// Download downloads the remote config source object returns a Checkpoint backed by the object,
// or a sanitized failure reason and error if the download fails
Download(client clientset.Interface) (Checkpoint, string, error)
@@ -110,6 +112,13 @@ func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMapRef.UID)
}
const configMapAPIPathFmt = "/api/v1/namespaces/%s/configmaps/%s"
func (r *remoteConfigMap) APIPath() string {
ref := r.source.ConfigMapRef
return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
}
func (r *remoteConfigMap) Download(client clientset.Interface) (Checkpoint, string, error) {
var reason string
uid := string(r.source.ConfigMapRef.UID)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package checkpoint
import (
"fmt"
"testing"
"github.com/davecgh/go-spew/spew"
@@ -92,6 +93,20 @@ func TestRemoteConfigMapUID(t *testing.T) {
}
}
func TestRemoteConfigMapAPIPath(t *testing.T) {
name := "name"
namespace := "namespace"
cpt := &remoteConfigMap{
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: name, Namespace: namespace, UID: ""}},
}
expect := fmt.Sprintf(configMapAPIPathFmt, cpt.source.ConfigMapRef.Namespace, cpt.source.ConfigMapRef.Name)
// APIPath() method should return the correct path to the referenced resource
path := cpt.APIPath()
if expect != path {
t.Errorf("expect APIPath() to return %q, but got %q", expect, namespace)
}
}
func TestRemoteConfigMapDownload(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {

View File

@@ -19,15 +19,30 @@ package kubeletconfig
import (
"fmt"
"os"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
const (
// KubeletConfigChangedEventReason identifies an event as a change of Kubelet configuration
KubeletConfigChangedEventReason = "KubeletConfigChanged"
// EventMessageFmt is the message format for Kubelet config change events
EventMessageFmt = "Kubelet will restart to use: %s"
// LocalConfigMessage is the text to apply to EventMessageFmt when the Kubelet has been configured to use its local config (init or defaults)
LocalConfigMessage = "local config"
)
// pokeConfiSourceWorker tells the worker thread that syncs config sources that work needs to be done
func (cc *Controller) pokeConfigSourceWorker() {
select {
@@ -37,7 +52,7 @@ func (cc *Controller) pokeConfigSourceWorker() {
}
// syncConfigSource checks if work needs to be done to use a new configuration, and does that work if necessary
func (cc *Controller) syncConfigSource(client clientset.Interface, nodeName string) {
func (cc *Controller) syncConfigSource(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) {
select {
case <-cc.pendingConfigSource:
default:
@@ -62,13 +77,22 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, nodeName stri
}
// check the Node and download any new config
if updated, reason, err := cc.doSyncConfigSource(client, node.Spec.ConfigSource); err != nil {
if updated, cur, reason, err := cc.doSyncConfigSource(client, node.Spec.ConfigSource); err != nil {
cc.configOK.SetFailSyncCondition(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
// TODO(mtaufen): Consider adding a "currently restarting kubelet" ConfigOK message for this case
utillog.Infof("config updated, Kubelet will restart to begin using new config")
path := LocalConfigMessage
if cur != nil {
path = cur.APIPath()
}
// we directly log and send the event, instead of using the event recorder,
// because the event recorder won't flush its queue before we exit (we'd lose the event)
event := eventf(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, EventMessageFmt, path)
glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil {
utillog.Errorf("failed to send event, error: %v", err)
}
os.Exit(0)
}
@@ -81,31 +105,31 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, nodeName stri
// doSyncConfigSource checkpoints and sets the store's current config to the new config or resets config,
// depending on the `source`, and returns whether the current config in the checkpoint store was updated as a result
func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *apiv1.NodeConfigSource) (bool, string, error) {
func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *apiv1.NodeConfigSource) (bool, checkpoint.RemoteConfigSource, string, error) {
if source == nil {
utillog.Infof("Node.Spec.ConfigSource is empty, will reset current and last-known-good to defaults")
updated, reason, err := cc.resetConfig()
if err != nil {
return false, reason, err
return false, nil, reason, err
}
return updated, "", nil
return updated, nil, "", nil
}
// if the NodeConfigSource is non-nil, download the config
utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary")
remote, reason, err := checkpoint.NewRemoteConfigSource(source)
if err != nil {
return false, reason, err
return false, nil, reason, err
}
reason, err = cc.checkpointConfigSource(client, remote)
if err != nil {
return false, reason, err
return false, nil, reason, err
}
updated, reason, err := cc.setCurrentConfig(remote)
if err != nil {
return false, reason, err
return false, nil, reason, err
}
return updated, "", nil
return updated, remote, "", nil
}
// checkpointConfigSource downloads and checkpoints the object referred to by `source` if the checkpoint does not already exist,
@@ -181,3 +205,43 @@ func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) {
}
return node, nil
}
// eventf constructs and returns an event containing a formatted message
// similar to k8s.io/client-go/tools/record/event.go
func eventf(nodeName, eventType, reason, messageFmt string, args ...interface{}) *apiv1.Event {
return makeEvent(nodeName, eventType, reason, fmt.Sprintf(messageFmt, args...))
}
// makeEvent constructs an event
// similar to makeEvent in k8s.io/client-go/tools/record/event.go
func makeEvent(nodeName, eventtype, reason, message string) *apiv1.Event {
const componentKubelet = "kubelet"
// NOTE(mtaufen): This is consistent with pkg/kubelet/kubelet.go. Even though setting the node
// name as the UID looks strange, it appears to be conventional for events sent by the Kubelet.
ref := apiv1.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
t := metav1.Time{Time: time.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &apiv1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: namespace,
},
InvolvedObject: ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
Source: apiv1.EventSource{Component: componentKubelet, Host: string(nodeName)},
}
}

View File

@@ -24,10 +24,10 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/validation"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
@@ -200,7 +200,7 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
// StartSync launches the controller's sync loops if `client` is non-nil and `nodeName` is non-empty.
// It will always start the Node condition reporting loop, and will also start the dynamic conifg sync loops
// if dynamic config is enabled on the controller. If `nodeName` is empty but `client` is non-nil, an error is logged.
func (cc *Controller) StartSync(client clientset.Interface, nodeName string) {
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) {
if client == nil {
utillog.Infof("nil client, will not start sync loops")
return
@@ -235,7 +235,7 @@ func (cc *Controller) StartSync(client clientset.Interface, nodeName string) {
go utilpanic.HandlePanic(func() {
utillog.Infof("starting config source sync loop")
wait.JitterUntil(func() {
cc.syncConfigSource(client, nodeName)
cc.syncConfigSource(client, eventClient, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})()
} else {