diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index cb49bee9b84..4fcd0fa34f6 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -760,6 +760,7 @@ type KubeletConfig struct { NodeIP net.IP ContainerRuntimeOptions []kubecontainer.Option HairpinMode string + Options []kubelet.Option } func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { @@ -846,6 +847,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.VolumeStatsAggPeriod, kc.ContainerRuntimeOptions, kc.HairpinMode, + kc.Options, ) if err != nil { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 168967c5746..ed7aca8791b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -146,6 +146,9 @@ type SyncHandler interface { type SourcesReadyFn func(sourcesSeen sets.String) bool +// Option is a functional option type for Kubelet +type Option func(*Kubelet) + // New instantiates a new Kubelet object along with all the required internal modules. // No initialization of Kubelet and its modules should happen here. func NewMainKubelet( @@ -206,6 +209,7 @@ func NewMainKubelet( volumeStatsAggPeriod time.Duration, containerRuntimeOptions []kubecontainer.Option, hairpinMode string, + kubeOptions []Option, ) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) @@ -489,6 +493,12 @@ func NewMainKubelet( klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) klet.sourcesSeen = sets.NewString() + klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() + + // apply functional Option's + for _, opt := range kubeOptions { + opt(klet) + } return klet, nil } @@ -721,6 +731,9 @@ type Kubelet struct { // (make cbr0 promiscuous), "hairpin-veth" (set the hairpin flag on veth interfaces) // or "none" (do nothing). hairpinMode componentconfig.HairpinMode + + // handlers called during the tryUpdateNodeStatus cycle + setNodeStatusFuncs []func(*api.Node) error } // Validate given node IP belongs to the current host @@ -3033,16 +3046,39 @@ func (kl *Kubelet) recordNodeSchdulableEvent(node *api.Node) { // TODO(madhusudancs): Simplify the logic for setting node conditions and // refactor the node status condtion code out to a different file. func (kl *Kubelet) setNodeStatus(node *api.Node) error { - if err := kl.setNodeAddress(node); err != nil { - return err + for _, f := range kl.setNodeStatusFuncs { + if err := f(node); err != nil { + return err + } } - kl.setNodeStatusInfo(node) - kl.setNodeOODCondition(node) - kl.setNodeReadyCondition(node) - kl.recordNodeSchdulableEvent(node) return nil } +// defaultNodeStatusFuncs is a factory that generates the default set of setNodeStatus funcs +func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error { + // initial set of node status update handlers, can be modified by Option's + withoutError := func(f func(*api.Node)) func(*api.Node) error { + return func(n *api.Node) error { + f(n) + return nil + } + } + return []func(*api.Node) error{ + kl.setNodeAddress, + withoutError(kl.setNodeStatusInfo), + withoutError(kl.setNodeOODCondition), + withoutError(kl.setNodeReadyCondition), + withoutError(kl.recordNodeSchdulableEvent), + } +} + +// SetNodeStatus returns a functional Option that adds the given node status update handler to the Kubelet +func SetNodeStatus(f func(*api.Node) error) Option { + return func(k *Kubelet) { + k.setNodeStatusFuncs = append(k.setNodeStatusFuncs, f) + } +} + // FIXME: Why not combine this with container runtime health check? func (kl *Kubelet) isContainerRuntimeVersionCompatible() error { switch kl.GetRuntime().Type() { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 692177064e8..5752fd89a5f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -191,6 +191,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { // Relist period does not affect the tests. kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil) kubelet.clock = fakeClock + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock} }