Merge pull request #91500 from deads2k/guarantee-service

reduce race risk in kubelet for missing KUBERNETES_SERVICE_HOST
This commit is contained in:
Kubernetes Prow Robot 2020-05-29 15:21:33 -07:00 committed by GitHub
commit 1d566466cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 125 additions and 65 deletions

View File

@ -31,6 +31,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"k8s.io/client-go/informers"
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
utilexec "k8s.io/utils/exec" utilexec "k8s.io/utils/exec"
"k8s.io/utils/integer" "k8s.io/utils/integer"
@ -434,13 +436,18 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(), PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
} }
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) var serviceLister corelisters.ServiceLister
var serviceHasSynced cache.InformerSynced
if kubeDeps.KubeClient != nil { if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything()) kubeInformers := informers.NewSharedInformerFactory(kubeDeps.KubeClient, 0)
r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0) serviceLister = kubeInformers.Core().V1().Services().Lister()
go r.Run(wait.NeverStop) serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced
kubeInformers.Start(wait.NeverStop)
} else {
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceLister = corelisters.NewServiceLister(serviceIndexer)
serviceHasSynced = func() bool { return true }
} }
serviceLister := corelisters.NewServiceLister(serviceIndexer)
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil { if kubeDeps.KubeClient != nil {
@ -498,6 +505,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
registerSchedulable: registerSchedulable, registerSchedulable: registerSchedulable,
dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig), dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
serviceLister: serviceLister, serviceLister: serviceLister,
serviceHasSynced: serviceHasSynced,
nodeLister: nodeLister, nodeLister: nodeLister,
masterServiceNamespace: masterServiceNamespace, masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
@ -881,6 +889,9 @@ type Kubelet struct {
masterServiceNamespace string masterServiceNamespace string
// serviceLister knows how to list services // serviceLister knows how to list services
serviceLister serviceLister serviceLister serviceLister
// serviceHasSynced indicates whether services have been sync'd at least once.
// Check this before trusting a response from the lister.
serviceHasSynced cache.InformerSynced
// nodeLister knows how to list nodes // nodeLister knows how to list nodes
nodeLister corelisters.NodeLister nodeLister corelisters.NodeLister

View File

@ -553,6 +553,18 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
return nil, fmt.Errorf("nil pod.spec.enableServiceLinks encountered, cannot construct envvars") return nil, fmt.Errorf("nil pod.spec.enableServiceLinks encountered, cannot construct envvars")
} }
// If the pod originates from the kube-api, when we know that the kube-apiserver is responding and the kubelet's credentials are valid.
// Knowing this, it is reasonable to wait until the service lister has synchronized at least once before attempting to build
// a service env var map. This doesn't present the race below from happening entirely, but it does prevent the "obvious"
// failure case of services simply not having completed a list operation that can reasonably be expected to succeed.
// One common case this prevents is a kubelet restart reading pods before services and some pod not having the
// KUBERNETES_SERVICE_HOST injected because we didn't wait a short time for services to sync before proceeding.
// The KUBERNETES_SERVICE_HOST link is special because it is unconditionally injected into pods and is read by the
// in-cluster-config for pod clients
if !kubetypes.IsStaticPod(pod) && !kl.serviceHasSynced() {
return nil, fmt.Errorf("services have not yet been read at least once, cannot construct envvars")
}
var result []kubecontainer.EnvVar var result []kubecontainer.EnvVar
// Note: These are added to the docker Config, but are not included in the checksum computed // Note: These are added to the docker Config, but are not included in the checksum computed
// by kubecontainer.HashContainer(...). That way, we can still determine whether an // by kubecontainer.HashContainer(...). That way, we can still determine whether an

View File

@ -48,6 +48,7 @@ import (
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming/remotecommand" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/remotecommand"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/volume/util/hostutil" "k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/subpath" "k8s.io/kubernetes/pkg/volume/util/subpath"
) )
@ -454,12 +455,36 @@ func TestMakeEnvironmentVariables(t *testing.T) {
container *v1.Container // the container to use container *v1.Container // the container to use
masterServiceNs string // the namespace to read master service info from masterServiceNs string // the namespace to read master service info from
nilLister bool // whether the lister should be nil nilLister bool // whether the lister should be nil
staticPod bool // whether the pod should be a static pod (versus an API pod)
unsyncedServices bool // whether the services should NOT be synced
configMap *v1.ConfigMap // an optional ConfigMap to pull from configMap *v1.ConfigMap // an optional ConfigMap to pull from
secret *v1.Secret // an optional Secret to pull from secret *v1.Secret // an optional Secret to pull from
expectedEnvs []kubecontainer.EnvVar // a set of expected environment vars expectedEnvs []kubecontainer.EnvVar // a set of expected environment vars
expectedError bool // does the test fail expectedError bool // does the test fail
expectedEvent string // does the test emit an event expectedEvent string // does the test emit an event
}{ }{
{
name: "if services aren't synced, non-static pods should fail",
ns: "test1",
enableServiceLinks: &falseValue,
container: &v1.Container{Env: []v1.EnvVar{}},
masterServiceNs: metav1.NamespaceDefault,
nilLister: false,
staticPod: false,
unsyncedServices: true,
expectedEnvs: []kubecontainer.EnvVar{},
expectedError: true,
},
{
name: "if services aren't synced, static pods should succeed", // if there is no service
ns: "test1",
enableServiceLinks: &falseValue,
container: &v1.Container{Env: []v1.EnvVar{}},
masterServiceNs: metav1.NamespaceDefault,
nilLister: false,
staticPod: true,
unsyncedServices: true,
},
{ {
name: "api server = Y, kubelet = Y", name: "api server = Y, kubelet = Y",
ns: "test1", ns: "test1",
@ -1606,71 +1631,82 @@ func TestMakeEnvironmentVariables(t *testing.T) {
} }
for _, tc := range testCases { for _, tc := range testCases {
fakeRecorder := record.NewFakeRecorder(1) t.Run(tc.name, func(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) fakeRecorder := record.NewFakeRecorder(1)
testKubelet.kubelet.recorder = fakeRecorder testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() testKubelet.kubelet.recorder = fakeRecorder
kl := testKubelet.kubelet defer testKubelet.Cleanup()
kl.masterServiceNamespace = tc.masterServiceNs kl := testKubelet.kubelet
if tc.nilLister { kl.masterServiceNamespace = tc.masterServiceNs
kl.serviceLister = nil if tc.nilLister {
} else { kl.serviceLister = nil
kl.serviceLister = testServiceLister{services} } else if tc.unsyncedServices {
} kl.serviceLister = testServiceLister{}
kl.serviceHasSynced = func() bool { return false }
} else {
kl.serviceLister = testServiceLister{services}
kl.serviceHasSynced = func() bool { return true }
}
testKubelet.fakeKubeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { testKubelet.fakeKubeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
var err error var err error
if tc.configMap == nil { if tc.configMap == nil {
err = apierrors.NewNotFound(action.GetResource().GroupResource(), "configmap-name") err = apierrors.NewNotFound(action.GetResource().GroupResource(), "configmap-name")
}
return true, tc.configMap, err
})
testKubelet.fakeKubeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) {
var err error
if tc.secret == nil {
err = apierrors.NewNotFound(action.GetResource().GroupResource(), "secret-name")
}
return true, tc.secret, err
})
testKubelet.fakeKubeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) {
var err error
if tc.secret == nil {
err = errors.New("no secret defined")
}
return true, tc.secret, err
})
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.ns,
Name: "dapi-test-pod-name",
Annotations: map[string]string{},
},
Spec: v1.PodSpec{
ServiceAccountName: "special",
NodeName: "node-name",
EnableServiceLinks: tc.enableServiceLinks,
},
} }
return true, tc.configMap, err podIP := "1.2.3.4"
}) podIPs := []string{"1.2.3.4,fd00::6"}
testKubelet.fakeKubeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) { if tc.staticPod {
var err error testPod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
if tc.secret == nil { }
err = apierrors.NewNotFound(action.GetResource().GroupResource(), "secret-name")
result, err := kl.makeEnvironmentVariables(testPod, tc.container, podIP, podIPs)
select {
case e := <-fakeRecorder.Events:
assert.Equal(t, tc.expectedEvent, e)
default:
assert.Equal(t, "", tc.expectedEvent)
}
if tc.expectedError {
assert.Error(t, err, tc.name)
} else {
assert.NoError(t, err, "[%s]", tc.name)
sort.Sort(envs(result))
sort.Sort(envs(tc.expectedEnvs))
assert.Equal(t, tc.expectedEnvs, result, "[%s] env entries", tc.name)
} }
return true, tc.secret, err
}) })
testKubelet.fakeKubeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) {
var err error
if tc.secret == nil {
err = errors.New("no secret defined")
}
return true, tc.secret, err
})
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.ns,
Name: "dapi-test-pod-name",
},
Spec: v1.PodSpec{
ServiceAccountName: "special",
NodeName: "node-name",
EnableServiceLinks: tc.enableServiceLinks,
},
}
podIP := "1.2.3.4"
podIPs := []string{"1.2.3.4,fd00::6"}
result, err := kl.makeEnvironmentVariables(testPod, tc.container, podIP, podIPs)
select {
case e := <-fakeRecorder.Events:
assert.Equal(t, tc.expectedEvent, e)
default:
assert.Equal(t, "", tc.expectedEvent)
}
if tc.expectedError {
assert.Error(t, err, tc.name)
} else {
assert.NoError(t, err, "[%s]", tc.name)
sort.Sort(envs(result))
sort.Sort(envs(tc.expectedEnvs))
assert.Equal(t, tc.expectedEnvs, result, "[%s] env entries", tc.name)
}
} }
} }

View File

@ -180,6 +180,7 @@ func newTestKubeletWithImageList(
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return true }) kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return true })
kubelet.masterServiceNamespace = metav1.NamespaceDefault kubelet.masterServiceNamespace = metav1.NamespaceDefault
kubelet.serviceLister = testServiceLister{} kubelet.serviceLister = testServiceLister{}
kubelet.serviceHasSynced = func() bool { return true }
kubelet.nodeLister = testNodeLister{ kubelet.nodeLister = testNodeLister{
nodes: []*v1.Node{ nodes: []*v1.Node{
{ {