diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 0cfc99c4a95..46bbbe4dc9e 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -753,6 +753,7 @@ type KubeletConfig struct { NodeIP net.IP ContainerRuntimeOptions []kubecontainer.Option HairpinMode string + Options []kubelet.Option } func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { @@ -839,6 +840,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.VolumeStatsAggPeriod, kc.ContainerRuntimeOptions, kc.HairpinMode, + kc.Options, ) if err != nil { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ec21595e17c..3334dda9c44 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -146,6 +146,9 @@ type SyncHandler interface { type SourcesReadyFn func(sourcesSeen sets.String) bool +// Option is a functional option type for Kubelet +type Option func(*Kubelet) + // New instantiates a new Kubelet object along with all the required internal modules. // No initialization of Kubelet and its modules should happen here. func NewMainKubelet( @@ -206,6 +209,7 @@ func NewMainKubelet( volumeStatsAggPeriod time.Duration, containerRuntimeOptions []kubecontainer.Option, hairpinMode string, + kubeOptions []Option, ) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) @@ -458,6 +462,12 @@ func NewMainKubelet( klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) klet.sourcesSeen = sets.NewString() + klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() + + // apply functional Option's + for _, opt := range kubeOptions { + opt(klet) + } return klet, nil } @@ -690,6 +700,9 @@ type Kubelet struct { // (make cbr0 promiscuous), "hairpin-veth" (set the hairpin flag on veth interfaces) // or "none" (do nothing). hairpinMode componentconfig.HairpinMode + + // handlers called during the tryUpdateNodeStatus cycle + setNodeStatusFuncs []func(*api.Node) error } // Validate given node IP belongs to the current host @@ -3002,16 +3015,39 @@ func (kl *Kubelet) recordNodeSchdulableEvent(node *api.Node) { // TODO(madhusudancs): Simplify the logic for setting node conditions and // refactor the node status condtion code out to a different file. func (kl *Kubelet) setNodeStatus(node *api.Node) error { - if err := kl.setNodeAddress(node); err != nil { - return err + for _, f := range kl.setNodeStatusFuncs { + if err := f(node); err != nil { + return err + } } - kl.setNodeStatusInfo(node) - kl.setNodeOODCondition(node) - kl.setNodeReadyCondition(node) - kl.recordNodeSchdulableEvent(node) return nil } +// defaultNodeStatusFuncs is a factory that generates the default set of setNodeStatus funcs +func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error { + // initial set of node status update handlers, can be modified by Option's + withoutError := func(f func(*api.Node)) func(*api.Node) error { + return func(n *api.Node) error { + f(n) + return nil + } + } + return []func(*api.Node) error{ + kl.setNodeAddress, + withoutError(kl.setNodeStatusInfo), + withoutError(kl.setNodeOODCondition), + withoutError(kl.setNodeReadyCondition), + withoutError(kl.recordNodeSchdulableEvent), + } +} + +// SetNodeStatus returns a functional Option that adds the given node status update handler to the Kubelet +func SetNodeStatus(f func(*api.Node) error) Option { + return func(k *Kubelet) { + k.setNodeStatusFuncs = append(k.setNodeStatusFuncs, f) + } +} + // FIXME: Why not combine this with container runtime health check? func (kl *Kubelet) isContainerRuntimeVersionCompatible() error { switch kl.GetRuntime().Type() { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 3fb742e1e85..99e41c028d1 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -191,6 +191,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { // Relist period does not affect the tests. kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil) kubelet.clock = fakeClock + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock} }