mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-25 10:00:53 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			356 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			356 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2014 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package kubelet
 | |
| 
 | |
| import (
 | |
| 	"reflect"
 | |
| 	"sync"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"k8s.io/api/core/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	"k8s.io/apimachinery/pkg/util/clock"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | |
| 	containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
 | |
| 	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/queue"
 | |
| )
 | |
| 
 | |
| // fakePodWorkers runs sync pod function in serial, so we can have
 | |
| // deterministic behaviour in testing.
 | |
| type fakePodWorkers struct {
 | |
| 	syncPodFn syncPodFnType
 | |
| 	cache     kubecontainer.Cache
 | |
| 	t         TestingInterface
 | |
| }
 | |
| 
 | |
| func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) {
 | |
| 	status, err := f.cache.Get(options.Pod.UID)
 | |
| 	if err != nil {
 | |
| 		f.t.Errorf("Unexpected error: %v", err)
 | |
| 	}
 | |
| 	if err := f.syncPodFn(syncPodOptions{
 | |
| 		mirrorPod:      options.MirrorPod,
 | |
| 		pod:            options.Pod,
 | |
| 		podStatus:      status,
 | |
| 		updateType:     options.UpdateType,
 | |
| 		killPodOptions: options.KillPodOptions,
 | |
| 	}); err != nil {
 | |
| 		f.t.Errorf("Unexpected error: %v", err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {}
 | |
| 
 | |
| func (f *fakePodWorkers) ForgetWorker(uid types.UID) {}
 | |
| 
 | |
| type TestingInterface interface {
 | |
| 	Errorf(format string, args ...interface{})
 | |
| }
 | |
| 
 | |
| func newPod(uid, name string) *v1.Pod {
 | |
| 	return &v1.Pod{
 | |
| 		ObjectMeta: metav1.ObjectMeta{
 | |
| 			UID:  types.UID(uid),
 | |
| 			Name: name,
 | |
| 		},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // syncPodRecord is a record of a sync pod call
 | |
| type syncPodRecord struct {
 | |
| 	name       string
 | |
| 	updateType kubetypes.SyncPodType
 | |
| }
 | |
| 
 | |
| func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
 | |
| 	lock := sync.Mutex{}
 | |
| 	processed := make(map[types.UID][]syncPodRecord)
 | |
| 	fakeRecorder := &record.FakeRecorder{}
 | |
| 	fakeRuntime := &containertest.FakeRuntime{}
 | |
| 	fakeCache := containertest.NewFakeCache(fakeRuntime)
 | |
| 	podWorkers := newPodWorkers(
 | |
| 		func(options syncPodOptions) error {
 | |
| 			func() {
 | |
| 				lock.Lock()
 | |
| 				defer lock.Unlock()
 | |
| 				pod := options.pod
 | |
| 				processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
 | |
| 					name:       pod.Name,
 | |
| 					updateType: options.updateType,
 | |
| 				})
 | |
| 			}()
 | |
| 			return nil
 | |
| 		},
 | |
| 		fakeRecorder,
 | |
| 		queue.NewBasicWorkQueue(&clock.RealClock{}),
 | |
| 		time.Second,
 | |
| 		time.Second,
 | |
| 		fakeCache,
 | |
| 	)
 | |
| 	return podWorkers, processed
 | |
| }
 | |
| 
 | |
| func drainWorkers(podWorkers *podWorkers, numPods int) {
 | |
| 	for {
 | |
| 		stillWorking := false
 | |
| 		podWorkers.podLock.Lock()
 | |
| 		for i := 0; i < numPods; i++ {
 | |
| 			if podWorkers.isWorking[types.UID(string(i))] {
 | |
| 				stillWorking = true
 | |
| 			}
 | |
| 		}
 | |
| 		podWorkers.podLock.Unlock()
 | |
| 		if !stillWorking {
 | |
| 			break
 | |
| 		}
 | |
| 		time.Sleep(50 * time.Millisecond)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestUpdatePod(t *testing.T) {
 | |
| 	podWorkers, processed := createPodWorkers()
 | |
| 
 | |
| 	// Check whether all pod updates will be processed.
 | |
| 	numPods := 20
 | |
| 	for i := 0; i < numPods; i++ {
 | |
| 		for j := i; j < numPods; j++ {
 | |
| 			podWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 				Pod:        newPod(string(j), string(i)),
 | |
| 				UpdateType: kubetypes.SyncPodCreate,
 | |
| 			})
 | |
| 		}
 | |
| 	}
 | |
| 	drainWorkers(podWorkers, numPods)
 | |
| 
 | |
| 	if len(processed) != numPods {
 | |
| 		t.Errorf("Not all pods processed: %v", len(processed))
 | |
| 		return
 | |
| 	}
 | |
| 	for i := 0; i < numPods; i++ {
 | |
| 		uid := types.UID(i)
 | |
| 		if len(processed[uid]) < 1 || len(processed[uid]) > i+1 {
 | |
| 			t.Errorf("Pod %v processed %v times", i, len(processed[uid]))
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		first := 0
 | |
| 		last := len(processed[uid]) - 1
 | |
| 		if processed[uid][first].name != string(0) {
 | |
| 			t.Errorf("Pod %v: incorrect order %v, %v", i, first, processed[uid][first])
 | |
| 
 | |
| 		}
 | |
| 		if processed[uid][last].name != string(i) {
 | |
| 			t.Errorf("Pod %v: incorrect order %v, %v", i, last, processed[uid][last])
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
 | |
| 	podWorkers, processed := createPodWorkers()
 | |
| 	numPods := 20
 | |
| 	for i := 0; i < numPods; i++ {
 | |
| 		pod := newPod(string(i), string(i))
 | |
| 		podWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 			Pod:        pod,
 | |
| 			UpdateType: kubetypes.SyncPodCreate,
 | |
| 		})
 | |
| 		podWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 			Pod:        pod,
 | |
| 			UpdateType: kubetypes.SyncPodKill,
 | |
| 		})
 | |
| 		podWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 			Pod:        pod,
 | |
| 			UpdateType: kubetypes.SyncPodUpdate,
 | |
| 		})
 | |
| 	}
 | |
| 	drainWorkers(podWorkers, numPods)
 | |
| 	if len(processed) != numPods {
 | |
| 		t.Errorf("Not all pods processed: %v", len(processed))
 | |
| 		return
 | |
| 	}
 | |
| 	for i := 0; i < numPods; i++ {
 | |
| 		uid := types.UID(i)
 | |
| 		// each pod should be processed two times (create, kill, but not update)
 | |
| 		syncPodRecords := processed[uid]
 | |
| 		if len(syncPodRecords) < 2 {
 | |
| 			t.Errorf("Pod %v processed %v times, but expected at least 2", i, len(syncPodRecords))
 | |
| 			continue
 | |
| 		}
 | |
| 		if syncPodRecords[0].updateType != kubetypes.SyncPodCreate {
 | |
| 			t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[0].updateType, kubetypes.SyncPodCreate)
 | |
| 		}
 | |
| 		if syncPodRecords[1].updateType != kubetypes.SyncPodKill {
 | |
| 			t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[1].updateType, kubetypes.SyncPodKill)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestForgetNonExistingPodWorkers(t *testing.T) {
 | |
| 	podWorkers, _ := createPodWorkers()
 | |
| 
 | |
| 	numPods := 20
 | |
| 	for i := 0; i < numPods; i++ {
 | |
| 		podWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 			Pod:        newPod(string(i), "name"),
 | |
| 			UpdateType: kubetypes.SyncPodUpdate,
 | |
| 		})
 | |
| 	}
 | |
| 	drainWorkers(podWorkers, numPods)
 | |
| 
 | |
| 	if len(podWorkers.podUpdates) != numPods {
 | |
| 		t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
 | |
| 	}
 | |
| 
 | |
| 	desiredPods := map[types.UID]empty{}
 | |
| 	desiredPods[types.UID(2)] = empty{}
 | |
| 	desiredPods[types.UID(14)] = empty{}
 | |
| 	podWorkers.ForgetNonExistingPodWorkers(desiredPods)
 | |
| 	if len(podWorkers.podUpdates) != 2 {
 | |
| 		t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
 | |
| 	}
 | |
| 	if _, exists := podWorkers.podUpdates[types.UID(2)]; !exists {
 | |
| 		t.Errorf("No updates channel for pod 2")
 | |
| 	}
 | |
| 	if _, exists := podWorkers.podUpdates[types.UID(14)]; !exists {
 | |
| 		t.Errorf("No updates channel for pod 14")
 | |
| 	}
 | |
| 
 | |
| 	podWorkers.ForgetNonExistingPodWorkers(map[types.UID]empty{})
 | |
| 	if len(podWorkers.podUpdates) != 0 {
 | |
| 		t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type simpleFakeKubelet struct {
 | |
| 	pod       *v1.Pod
 | |
| 	mirrorPod *v1.Pod
 | |
| 	podStatus *kubecontainer.PodStatus
 | |
| 	wg        sync.WaitGroup
 | |
| }
 | |
| 
 | |
| func (kl *simpleFakeKubelet) syncPod(options syncPodOptions) error {
 | |
| 	kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (kl *simpleFakeKubelet) syncPodWithWaitGroup(options syncPodOptions) error {
 | |
| 	kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
 | |
| 	kl.wg.Done()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // byContainerName sort the containers in a running pod by their names.
 | |
| type byContainerName kubecontainer.Pod
 | |
| 
 | |
| func (b byContainerName) Len() int { return len(b.Containers) }
 | |
| 
 | |
| func (b byContainerName) Swap(i, j int) {
 | |
| 	b.Containers[i], b.Containers[j] = b.Containers[j], b.Containers[i]
 | |
| }
 | |
| 
 | |
| func (b byContainerName) Less(i, j int) bool {
 | |
| 	return b.Containers[i].Name < b.Containers[j].Name
 | |
| }
 | |
| 
 | |
| // TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers
 | |
| // for their invocation of the syncPodFn.
 | |
| func TestFakePodWorkers(t *testing.T) {
 | |
| 	fakeRecorder := &record.FakeRecorder{}
 | |
| 	fakeRuntime := &containertest.FakeRuntime{}
 | |
| 	fakeCache := containertest.NewFakeCache(fakeRuntime)
 | |
| 
 | |
| 	kubeletForRealWorkers := &simpleFakeKubelet{}
 | |
| 	kubeletForFakeWorkers := &simpleFakeKubelet{}
 | |
| 
 | |
| 	realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(&clock.RealClock{}), time.Second, time.Second, fakeCache)
 | |
| 	fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeCache, t}
 | |
| 
 | |
| 	tests := []struct {
 | |
| 		pod       *v1.Pod
 | |
| 		mirrorPod *v1.Pod
 | |
| 	}{
 | |
| 		{
 | |
| 			&v1.Pod{},
 | |
| 			&v1.Pod{},
 | |
| 		},
 | |
| 		{
 | |
| 			podWithUIDNameNs("12345678", "foo", "new"),
 | |
| 			podWithUIDNameNs("12345678", "fooMirror", "new"),
 | |
| 		},
 | |
| 		{
 | |
| 			podWithUIDNameNs("98765", "bar", "new"),
 | |
| 			podWithUIDNameNs("98765", "barMirror", "new"),
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	for i, tt := range tests {
 | |
| 		kubeletForRealWorkers.wg.Add(1)
 | |
| 		realPodWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 			Pod:        tt.pod,
 | |
| 			MirrorPod:  tt.mirrorPod,
 | |
| 			UpdateType: kubetypes.SyncPodUpdate,
 | |
| 		})
 | |
| 		fakePodWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 			Pod:        tt.pod,
 | |
| 			MirrorPod:  tt.mirrorPod,
 | |
| 			UpdateType: kubetypes.SyncPodUpdate,
 | |
| 		})
 | |
| 
 | |
| 		kubeletForRealWorkers.wg.Wait()
 | |
| 
 | |
| 		if !reflect.DeepEqual(kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) {
 | |
| 			t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod)
 | |
| 		}
 | |
| 
 | |
| 		if !reflect.DeepEqual(kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) {
 | |
| 			t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
 | |
| 		}
 | |
| 
 | |
| 		if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) {
 | |
| 			t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected.
 | |
| func TestKillPodNowFunc(t *testing.T) {
 | |
| 	fakeRecorder := &record.FakeRecorder{}
 | |
| 	podWorkers, processed := createPodWorkers()
 | |
| 	killPodFunc := killPodNow(podWorkers, fakeRecorder)
 | |
| 	pod := newPod("test", "test")
 | |
| 	gracePeriodOverride := int64(0)
 | |
| 	err := killPodFunc(pod, v1.PodStatus{Phase: v1.PodFailed, Reason: "reason", Message: "message"}, &gracePeriodOverride)
 | |
| 	if err != nil {
 | |
| 		t.Errorf("Unexpected error: %v", err)
 | |
| 	}
 | |
| 	if len(processed) != 1 {
 | |
| 		t.Errorf("len(processed) expected: %v, actual: %v", 1, len(processed))
 | |
| 		return
 | |
| 	}
 | |
| 	syncPodRecords := processed[pod.UID]
 | |
| 	if len(syncPodRecords) != 1 {
 | |
| 		t.Errorf("Pod processed %v times, but expected %v", len(syncPodRecords), 1)
 | |
| 	}
 | |
| 	if syncPodRecords[0].updateType != kubetypes.SyncPodKill {
 | |
| 		t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
 | |
| 	}
 | |
| }
 |