From 4da0e64bc1b9c23b71589ff38e875ff67e8f649e Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 27 May 2020 11:44:14 -0400 Subject: [PATCH] reduce race risk in kubelet for missing KUBERNETES_SERVICE_HOST --- pkg/kubelet/kubelet.go | 21 ++++- pkg/kubelet/kubelet_pods.go | 12 +++ pkg/kubelet/kubelet_pods_test.go | 156 +++++++++++++++++++------------ pkg/kubelet/kubelet_test.go | 1 + 4 files changed, 125 insertions(+), 65 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9315f4915fb..e2d52032d55 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -31,6 +31,8 @@ import ( "sync/atomic" "time" + "k8s.io/client-go/informers" + cadvisorapi "github.com/google/cadvisor/info/v1" utilexec "k8s.io/utils/exec" "k8s.io/utils/integer" @@ -434,13 +436,18 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(), } - serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + var serviceLister corelisters.ServiceLister + var serviceHasSynced cache.InformerSynced if kubeDeps.KubeClient != nil { - serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything()) - r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0) - go r.Run(wait.NeverStop) + kubeInformers := informers.NewSharedInformerFactory(kubeDeps.KubeClient, 0) + serviceLister = kubeInformers.Core().V1().Services().Lister() + serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced + kubeInformers.Start(wait.NeverStop) + } else { + serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + serviceLister = corelisters.NewServiceLister(serviceIndexer) + serviceHasSynced = func() bool { return true } } - serviceLister := corelisters.NewServiceLister(serviceIndexer) nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) if kubeDeps.KubeClient != nil { @@ -500,6 +507,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, registerSchedulable: registerSchedulable, dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig), serviceLister: serviceLister, + serviceHasSynced: serviceHasSynced, nodeLister: nodeLister, masterServiceNamespace: masterServiceNamespace, streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, @@ -885,6 +893,9 @@ type Kubelet struct { masterServiceNamespace string // serviceLister knows how to list services serviceLister serviceLister + // serviceHasSynced indicates whether services have been sync'd at least once. + // Check this before trusting a response from the lister. + serviceHasSynced cache.InformerSynced // nodeLister knows how to list nodes nodeLister corelisters.NodeLister diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index d70acf1d803..51da1481c71 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -553,6 +553,18 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container return nil, fmt.Errorf("nil pod.spec.enableServiceLinks encountered, cannot construct envvars") } + // If the pod originates from the kube-api, when we know that the kube-apiserver is responding and the kubelet's credentials are valid. + // Knowing this, it is reasonable to wait until the service lister has synchronized at least once before attempting to build + // a service env var map. This doesn't present the race below from happening entirely, but it does prevent the "obvious" + // failure case of services simply not having completed a list operation that can reasonably be expected to succeed. + // One common case this prevents is a kubelet restart reading pods before services and some pod not having the + // KUBERNETES_SERVICE_HOST injected because we didn't wait a short time for services to sync before proceeding. + // The KUBERNETES_SERVICE_HOST link is special because it is unconditionally injected into pods and is read by the + // in-cluster-config for pod clients + if !kubetypes.IsStaticPod(pod) && !kl.serviceHasSynced() { + return nil, fmt.Errorf("services have not yet been read at least once, cannot construct envvars") + } + var result []kubecontainer.EnvVar // Note: These are added to the docker Config, but are not included in the checksum computed // by kubecontainer.HashContainer(...). That way, we can still determine whether an diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 5886f49d6a9..6643c836e8b 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -48,6 +48,7 @@ import ( containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/remotecommand" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/volume/util/hostutil" "k8s.io/kubernetes/pkg/volume/util/subpath" ) @@ -454,12 +455,36 @@ func TestMakeEnvironmentVariables(t *testing.T) { container *v1.Container // the container to use masterServiceNs string // the namespace to read master service info from nilLister bool // whether the lister should be nil + staticPod bool // whether the pod should be a static pod (versus an API pod) + unsyncedServices bool // whether the services should NOT be synced configMap *v1.ConfigMap // an optional ConfigMap to pull from secret *v1.Secret // an optional Secret to pull from expectedEnvs []kubecontainer.EnvVar // a set of expected environment vars expectedError bool // does the test fail expectedEvent string // does the test emit an event }{ + { + name: "if services aren't synced, non-static pods should fail", + ns: "test1", + enableServiceLinks: &falseValue, + container: &v1.Container{Env: []v1.EnvVar{}}, + masterServiceNs: metav1.NamespaceDefault, + nilLister: false, + staticPod: false, + unsyncedServices: true, + expectedEnvs: []kubecontainer.EnvVar{}, + expectedError: true, + }, + { + name: "if services aren't synced, static pods should succeed", // if there is no service + ns: "test1", + enableServiceLinks: &falseValue, + container: &v1.Container{Env: []v1.EnvVar{}}, + masterServiceNs: metav1.NamespaceDefault, + nilLister: false, + staticPod: true, + unsyncedServices: true, + }, { name: "api server = Y, kubelet = Y", ns: "test1", @@ -1606,71 +1631,82 @@ func TestMakeEnvironmentVariables(t *testing.T) { } for _, tc := range testCases { - fakeRecorder := record.NewFakeRecorder(1) - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - testKubelet.kubelet.recorder = fakeRecorder - defer testKubelet.Cleanup() - kl := testKubelet.kubelet - kl.masterServiceNamespace = tc.masterServiceNs - if tc.nilLister { - kl.serviceLister = nil - } else { - kl.serviceLister = testServiceLister{services} - } + t.Run(tc.name, func(t *testing.T) { + fakeRecorder := record.NewFakeRecorder(1) + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + testKubelet.kubelet.recorder = fakeRecorder + defer testKubelet.Cleanup() + kl := testKubelet.kubelet + kl.masterServiceNamespace = tc.masterServiceNs + if tc.nilLister { + kl.serviceLister = nil + } else if tc.unsyncedServices { + kl.serviceLister = testServiceLister{} + kl.serviceHasSynced = func() bool { return false } + } else { + kl.serviceLister = testServiceLister{services} + kl.serviceHasSynced = func() bool { return true } + } - testKubelet.fakeKubeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { - var err error - if tc.configMap == nil { - err = apierrors.NewNotFound(action.GetResource().GroupResource(), "configmap-name") + testKubelet.fakeKubeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + var err error + if tc.configMap == nil { + err = apierrors.NewNotFound(action.GetResource().GroupResource(), "configmap-name") + } + return true, tc.configMap, err + }) + testKubelet.fakeKubeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) { + var err error + if tc.secret == nil { + err = apierrors.NewNotFound(action.GetResource().GroupResource(), "secret-name") + } + return true, tc.secret, err + }) + + testKubelet.fakeKubeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) { + var err error + if tc.secret == nil { + err = errors.New("no secret defined") + } + return true, tc.secret, err + }) + + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: tc.ns, + Name: "dapi-test-pod-name", + Annotations: map[string]string{}, + }, + Spec: v1.PodSpec{ + ServiceAccountName: "special", + NodeName: "node-name", + EnableServiceLinks: tc.enableServiceLinks, + }, } - return true, tc.configMap, err - }) - testKubelet.fakeKubeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) { - var err error - if tc.secret == nil { - err = apierrors.NewNotFound(action.GetResource().GroupResource(), "secret-name") + podIP := "1.2.3.4" + podIPs := []string{"1.2.3.4,fd00::6"} + if tc.staticPod { + testPod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file" + } + + result, err := kl.makeEnvironmentVariables(testPod, tc.container, podIP, podIPs) + select { + case e := <-fakeRecorder.Events: + assert.Equal(t, tc.expectedEvent, e) + default: + assert.Equal(t, "", tc.expectedEvent) + } + if tc.expectedError { + assert.Error(t, err, tc.name) + } else { + assert.NoError(t, err, "[%s]", tc.name) + + sort.Sort(envs(result)) + sort.Sort(envs(tc.expectedEnvs)) + assert.Equal(t, tc.expectedEnvs, result, "[%s] env entries", tc.name) } - return true, tc.secret, err }) - testKubelet.fakeKubeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) { - var err error - if tc.secret == nil { - err = errors.New("no secret defined") - } - return true, tc.secret, err - }) - - testPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: tc.ns, - Name: "dapi-test-pod-name", - }, - Spec: v1.PodSpec{ - ServiceAccountName: "special", - NodeName: "node-name", - EnableServiceLinks: tc.enableServiceLinks, - }, - } - podIP := "1.2.3.4" - podIPs := []string{"1.2.3.4,fd00::6"} - - result, err := kl.makeEnvironmentVariables(testPod, tc.container, podIP, podIPs) - select { - case e := <-fakeRecorder.Events: - assert.Equal(t, tc.expectedEvent, e) - default: - assert.Equal(t, "", tc.expectedEvent) - } - if tc.expectedError { - assert.Error(t, err, tc.name) - } else { - assert.NoError(t, err, "[%s]", tc.name) - - sort.Sort(envs(result)) - sort.Sort(envs(tc.expectedEnvs)) - assert.Equal(t, tc.expectedEnvs, result, "[%s] env entries", tc.name) - } } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f85cdc1aee5..b22a286c6a9 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -180,6 +180,7 @@ func newTestKubeletWithImageList( kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return true }) kubelet.masterServiceNamespace = metav1.NamespaceDefault kubelet.serviceLister = testServiceLister{} + kubelet.serviceHasSynced = func() bool { return true } kubelet.nodeLister = testNodeLister{ nodes: []*v1.Node{ {