mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #3423 from saad-ali/fix3371
Split up kubelet "source seen" logic
This commit is contained in:
commit
f589ee98e0
@ -55,10 +55,6 @@ type PodConfig struct {
|
|||||||
|
|
||||||
// the channel of denormalized changes passed to listeners
|
// the channel of denormalized changes passed to listeners
|
||||||
updates chan kubelet.PodUpdate
|
updates chan kubelet.PodUpdate
|
||||||
|
|
||||||
// contains the list of all configured sources
|
|
||||||
sourcesLock sync.Mutex
|
|
||||||
sources util.StringSet
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPodConfig creates an object that can merge many configuration sources into a stream
|
// NewPodConfig creates an object that can merge many configuration sources into a stream
|
||||||
@ -70,7 +66,6 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
|
|||||||
pods: storage,
|
pods: storage,
|
||||||
mux: config.NewMux(storage),
|
mux: config.NewMux(storage),
|
||||||
updates: updates,
|
updates: updates,
|
||||||
sources: util.StringSet{},
|
|
||||||
}
|
}
|
||||||
return podConfig
|
return podConfig
|
||||||
}
|
}
|
||||||
@ -78,20 +73,15 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
|
|||||||
// Channel creates or returns a config source channel. The channel
|
// Channel creates or returns a config source channel. The channel
|
||||||
// only accepts PodUpdates
|
// only accepts PodUpdates
|
||||||
func (c *PodConfig) Channel(source string) chan<- interface{} {
|
func (c *PodConfig) Channel(source string) chan<- interface{} {
|
||||||
c.sourcesLock.Lock()
|
|
||||||
defer c.sourcesLock.Unlock()
|
|
||||||
c.sources.Insert(source)
|
|
||||||
return c.mux.Channel(source)
|
return c.mux.Channel(source)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeenAllSources returns true if this config has received a SET
|
func (c *PodConfig) SourceSeen(source string) bool {
|
||||||
// message from all configured sources, false otherwise.
|
|
||||||
func (c *PodConfig) SeenAllSources() bool {
|
|
||||||
if c.pods == nil {
|
if c.pods == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen)
|
glog.V(6).Infof("Looking for %v, have seen %v", source, c.pods.sourcesSeen)
|
||||||
return c.pods.seenSources(c.sources.List()...)
|
return c.pods.seenSources(source)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Updates returns a channel of updates to the configuration, properly denormalized.
|
// Updates returns a channel of updates to the configuration, properly denormalized.
|
||||||
|
@ -56,7 +56,7 @@ type SyncHandler interface {
|
|||||||
SyncPods([]api.BoundPod) error
|
SyncPods([]api.BoundPod) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type SourcesReadyFn func() bool
|
type SourceReadyFn func(source string) bool
|
||||||
|
|
||||||
type volumeMap map[string]volume.Interface
|
type volumeMap map[string]volume.Interface
|
||||||
|
|
||||||
@ -73,7 +73,7 @@ func NewMainKubelet(
|
|||||||
pullBurst int,
|
pullBurst int,
|
||||||
minimumGCAge time.Duration,
|
minimumGCAge time.Duration,
|
||||||
maxContainerCount int,
|
maxContainerCount int,
|
||||||
sourcesReady SourcesReadyFn,
|
sourceReady SourceReadyFn,
|
||||||
clusterDomain string,
|
clusterDomain string,
|
||||||
clusterDNS net.IP) (*Kubelet, error) {
|
clusterDNS net.IP) (*Kubelet, error) {
|
||||||
if resyncInterval <= 0 {
|
if resyncInterval <= 0 {
|
||||||
@ -97,7 +97,7 @@ func NewMainKubelet(
|
|||||||
pullBurst: pullBurst,
|
pullBurst: pullBurst,
|
||||||
minimumGCAge: minimumGCAge,
|
minimumGCAge: minimumGCAge,
|
||||||
maxContainerCount: maxContainerCount,
|
maxContainerCount: maxContainerCount,
|
||||||
sourcesReady: sourcesReady,
|
sourceReady: sourceReady,
|
||||||
clusterDomain: clusterDomain,
|
clusterDomain: clusterDomain,
|
||||||
clusterDNS: clusterDNS,
|
clusterDNS: clusterDNS,
|
||||||
}, nil
|
}, nil
|
||||||
@ -116,7 +116,7 @@ type Kubelet struct {
|
|||||||
podWorkers *podWorkers
|
podWorkers *podWorkers
|
||||||
resyncInterval time.Duration
|
resyncInterval time.Duration
|
||||||
pods []api.BoundPod
|
pods []api.BoundPod
|
||||||
sourcesReady SourcesReadyFn
|
sourceReady SourceReadyFn
|
||||||
|
|
||||||
// Needed to report events for containers belonging to deleted/modified pods.
|
// Needed to report events for containers belonging to deleted/modified pods.
|
||||||
// Tracks references for reporting events
|
// Tracks references for reporting events
|
||||||
@ -1062,12 +1062,6 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if !kl.sourcesReady() {
|
|
||||||
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
|
|
||||||
// for sources that haven't reported yet.
|
|
||||||
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// Kill any containers we don't need.
|
// Kill any containers we don't need.
|
||||||
for _, container := range dockerContainers {
|
for _, container := range dockerContainers {
|
||||||
// Don't kill containers that are in the desired pods.
|
// Don't kill containers that are in the desired pods.
|
||||||
@ -1076,6 +1070,13 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
|
|||||||
// syncPod() will handle this one.
|
// syncPod() will handle this one.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
_, _, podAnnotations := ParsePodFullName(podFullName)
|
||||||
|
if source := podAnnotations[ConfigSourceAnnotationKey]; !kl.sourceReady(source) {
|
||||||
|
// If the source for this container is not ready, skip deletion, so that we don't accidentally
|
||||||
|
// delete containers for sources that haven't reported yet.
|
||||||
|
glog.V(4).Infof("Skipping delete of container (%q), source (%s) aren't ready yet.", podFullName, source)
|
||||||
|
continue
|
||||||
|
}
|
||||||
pc := podContainer{podFullName, uuid, containerName}
|
pc := podContainer{podFullName, uuid, containerName}
|
||||||
if _, ok := desiredContainers[pc]; !ok {
|
if _, ok := desiredContainers[pc]; !ok {
|
||||||
glog.V(1).Infof("Killing unwanted container %+v", pc)
|
glog.V(1).Infof("Killing unwanted container %+v", pc)
|
||||||
|
@ -57,7 +57,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools
|
|||||||
kubelet.etcdClient = fakeEtcdClient
|
kubelet.etcdClient = fakeEtcdClient
|
||||||
kubelet.rootDirectory = "/tmp/kubelet"
|
kubelet.rootDirectory = "/tmp/kubelet"
|
||||||
kubelet.podWorkers = newPodWorkers()
|
kubelet.podWorkers = newPodWorkers()
|
||||||
kubelet.sourcesReady = func() bool { return true }
|
kubelet.sourceReady = func(source string) bool { return true }
|
||||||
return kubelet, fakeEtcdClient, fakeDocker
|
return kubelet, fakeEtcdClient, fakeDocker
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -617,7 +617,7 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) {
|
|||||||
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
||||||
ready := false
|
ready := false
|
||||||
kubelet, _, fakeDocker := newTestKubelet(t)
|
kubelet, _, fakeDocker := newTestKubelet(t)
|
||||||
kubelet.sourcesReady = func() bool { return ready }
|
kubelet.sourceReady = func(source string) bool { return ready }
|
||||||
|
|
||||||
fakeDocker.ContainerList = []docker.APIContainers{
|
fakeDocker.ContainerList = []docker.APIContainers{
|
||||||
{
|
{
|
||||||
@ -657,6 +657,67 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
|
||||||
|
ready := false
|
||||||
|
kubelet, _, fakeDocker := newTestKubelet(t)
|
||||||
|
kubelet.sourceReady = func(source string) bool {
|
||||||
|
if source == "testSource" {
|
||||||
|
return ready
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeDocker.ContainerList = []docker.APIContainers{
|
||||||
|
{
|
||||||
|
// the k8s prefix is required for the kubelet to manage the container
|
||||||
|
Names: []string{"/k8s_boo_bar.default.testSource_12345678_42"},
|
||||||
|
ID: "7492",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// network container
|
||||||
|
Names: []string{"/k8s_net_boo.default.testSource_12345678_42"},
|
||||||
|
ID: "3542",
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
// the k8s prefix is required for the kubelet to manage the container
|
||||||
|
Names: []string{"/k8s_foo_bar.new.otherSource_12345678_42"},
|
||||||
|
ID: "1234",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// network container
|
||||||
|
Names: []string{"/k8s_net_foo.new.otherSource_12345678_42"},
|
||||||
|
ID: "9876",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
// Validate nothing happened.
|
||||||
|
verifyCalls(t, fakeDocker, []string{"list"})
|
||||||
|
fakeDocker.ClearCalls()
|
||||||
|
|
||||||
|
ready = true
|
||||||
|
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"})
|
||||||
|
|
||||||
|
// Validate container for testSource are killed because testSource is reported as seen, but
|
||||||
|
// containers for otherSource are not killed because otherSource has not.
|
||||||
|
expectedToStop := map[string]bool{
|
||||||
|
"7492": true,
|
||||||
|
"3542": true,
|
||||||
|
"1234": false,
|
||||||
|
"9876": false,
|
||||||
|
}
|
||||||
|
if len(fakeDocker.Stopped) != 2 ||
|
||||||
|
!expectedToStop[fakeDocker.Stopped[0]] ||
|
||||||
|
!expectedToStop[fakeDocker.Stopped[1]] {
|
||||||
|
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSyncPodsDeletes(t *testing.T) {
|
func TestSyncPodsDeletes(t *testing.T) {
|
||||||
kubelet, _, fakeDocker := newTestKubelet(t)
|
kubelet, _, fakeDocker := newTestKubelet(t)
|
||||||
fakeDocker.ContainerList = []docker.APIContainers{
|
fakeDocker.ContainerList = []docker.APIContainers{
|
||||||
|
@ -18,8 +18,10 @@ package kubelet
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
|
const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
|
||||||
@ -71,3 +73,20 @@ type PodUpdate struct {
|
|||||||
func GetPodFullName(pod *api.BoundPod) string {
|
func GetPodFullName(pod *api.BoundPod) string {
|
||||||
return fmt.Sprintf("%s.%s.%s", pod.Name, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey])
|
return fmt.Sprintf("%s.%s.%s", pod.Name, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParsePodFullName unpacks a pod full name and returns the pod name, namespace, and annotations.
|
||||||
|
// If the pod full name is invalid, empty strings are returend.
|
||||||
|
func ParsePodFullName(podFullName string) (podName, podNamespace string, podAnnotations map[string]string) {
|
||||||
|
parts := strings.Split(podFullName, ".")
|
||||||
|
expectedNumFields := 3
|
||||||
|
actualNumFields := len(parts)
|
||||||
|
if actualNumFields != expectedNumFields {
|
||||||
|
glog.Warningf("found a podFullName (%q) with too few fields: expected %d, actual %d.", podFullName, expectedNumFields, actualNumFields)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
podName = parts[0]
|
||||||
|
podNamespace = parts[1]
|
||||||
|
podAnnotations = make(map[string]string)
|
||||||
|
podAnnotations[ConfigSourceAnnotationKey] = parts[2]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
44
pkg/kubelet/types_test.go
Normal file
44
pkg/kubelet/types_test.go
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kubelet
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParsePodFullName(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
podFullName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1.default.etcd"
|
||||||
|
|
||||||
|
// Act
|
||||||
|
podName, podNamespace, podAnnotations := ParsePodFullName(podFullName)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
expectedPodName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1"
|
||||||
|
expectedPodNamespace := "default"
|
||||||
|
expectedSource := "etcd"
|
||||||
|
if podName != expectedPodName {
|
||||||
|
t.Errorf("Unexpected PodName. Expected: %q Actual: %q", expectedPodName, podName)
|
||||||
|
}
|
||||||
|
if podNamespace != expectedPodNamespace {
|
||||||
|
t.Errorf("Unexpected PodNamespace. Expected: %q Actual: %q", expectedPodNamespace, podNamespace)
|
||||||
|
}
|
||||||
|
if podAnnotations[ConfigSourceAnnotationKey] != expectedSource {
|
||||||
|
t.Errorf("Unexpected PodSource. Expected: %q Actual: %q", expectedPodNamespace, podNamespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -277,7 +277,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
|
|||||||
kc.RegistryBurst,
|
kc.RegistryBurst,
|
||||||
kc.MinimumGCAge,
|
kc.MinimumGCAge,
|
||||||
kc.MaxContainerCount,
|
kc.MaxContainerCount,
|
||||||
pc.SeenAllSources,
|
pc.SourceSeen,
|
||||||
kc.ClusterDomain,
|
kc.ClusterDomain,
|
||||||
net.IP(kc.ClusterDNS))
|
net.IP(kc.ClusterDNS))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user