mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-14 13:45:06 +00:00
Merge pull request #14950 from yujuhong/sync_type
Auto commit by PR queue bot
This commit is contained in:
@@ -30,7 +30,7 @@ type fakePodWorkers struct {
|
|||||||
t TestingInterface
|
t TestingInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {
|
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) {
|
||||||
pods, err := f.runtimeCache.GetPods()
|
pods, err := f.runtimeCache.GetPods()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.t.Errorf("Unexpected error: %v", err)
|
f.t.Errorf("Unexpected error: %v", err)
|
||||||
|
@@ -1304,9 +1304,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
|
|||||||
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
|
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
|
||||||
// state of a newly started container with the apiserver before the kubelet restarted, so
|
// state of a newly started container with the apiserver before the kubelet restarted, so
|
||||||
// it's OK to pretend like the kubelet started them after it restarted.
|
// it's OK to pretend like the kubelet started them after it restarted.
|
||||||
//
|
|
||||||
// Also note that deletes currently have an updateType of `create` set in UpdatePods.
|
|
||||||
// This, again, does not matter because deletes are not processed by this method.
|
|
||||||
|
|
||||||
var podStatus api.PodStatus
|
var podStatus api.PodStatus
|
||||||
if updateType == SyncPodCreate {
|
if updateType == SyncPodCreate {
|
||||||
@@ -1952,7 +1949,7 @@ func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *a
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Run the sync in an async worker.
|
// Run the sync in an async worker.
|
||||||
kl.podWorkers.UpdatePod(pod, mirrorPod, func() {
|
kl.podWorkers.UpdatePod(pod, mirrorPod, syncType, func() {
|
||||||
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
|
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
|
||||||
})
|
})
|
||||||
// Note the number of containers for new pods.
|
// Note the number of containers for new pods.
|
||||||
|
@@ -63,28 +63,6 @@ type podManager interface {
|
|||||||
mirrorClient
|
mirrorClient
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncPodType classifies pod updates, eg: create, update.
|
|
||||||
type SyncPodType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
SyncPodSync SyncPodType = iota
|
|
||||||
SyncPodUpdate
|
|
||||||
SyncPodCreate
|
|
||||||
)
|
|
||||||
|
|
||||||
func (sp SyncPodType) String() string {
|
|
||||||
switch sp {
|
|
||||||
case SyncPodCreate:
|
|
||||||
return "create"
|
|
||||||
case SyncPodUpdate:
|
|
||||||
return "update"
|
|
||||||
case SyncPodSync:
|
|
||||||
return "sync"
|
|
||||||
default:
|
|
||||||
return "unknown"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// All maps in basicPodManager should be set by calling UpdatePods();
|
// All maps in basicPodManager should be set by calling UpdatePods();
|
||||||
// individual arrays/maps are not immutable and no other methods should attempt
|
// individual arrays/maps are not immutable and no other methods should attempt
|
||||||
// to modify them.
|
// to modify them.
|
||||||
|
@@ -30,7 +30,7 @@ import (
|
|||||||
|
|
||||||
// PodWorkers is an abstract interface for testability.
|
// PodWorkers is an abstract interface for testability.
|
||||||
type PodWorkers interface {
|
type PodWorkers interface {
|
||||||
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func())
|
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func())
|
||||||
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
|
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
|
||||||
ForgetWorker(uid types.UID)
|
ForgetWorker(uid types.UID)
|
||||||
}
|
}
|
||||||
@@ -121,19 +121,11 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
|
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
|
||||||
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {
|
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType SyncPodType, updateComplete func()) {
|
||||||
uid := pod.UID
|
uid := pod.UID
|
||||||
var podUpdates chan workUpdate
|
var podUpdates chan workUpdate
|
||||||
var exists bool
|
var exists bool
|
||||||
|
|
||||||
// TODO: Pipe this through from the kubelet. Currently kubelets operating with
|
|
||||||
// snapshot updates (PodConfigNotificationSnapshot) will send updates, creates
|
|
||||||
// and deletes as SET operations, which makes updates indistinguishable from
|
|
||||||
// creates. The intent here is to communicate to the pod worker that it can take
|
|
||||||
// certain liberties, like skipping status generation, when it receives a create
|
|
||||||
// event for a pod.
|
|
||||||
updateType := SyncPodUpdate
|
|
||||||
|
|
||||||
p.podLock.Lock()
|
p.podLock.Lock()
|
||||||
defer p.podLock.Unlock()
|
defer p.podLock.Unlock()
|
||||||
if podUpdates, exists = p.podUpdates[uid]; !exists {
|
if podUpdates, exists = p.podUpdates[uid]; !exists {
|
||||||
@@ -148,7 +140,6 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete
|
|||||||
// kubelet just restarted. In either case the kubelet is willing to believe
|
// kubelet just restarted. In either case the kubelet is willing to believe
|
||||||
// the status of the pod for the first pod worker sync. See corresponding
|
// the status of the pod for the first pod worker sync. See corresponding
|
||||||
// comment in syncPod.
|
// comment in syncPod.
|
||||||
updateType = SyncPodCreate
|
|
||||||
go func() {
|
go func() {
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
p.managePodLoop(podUpdates)
|
p.managePodLoop(podUpdates)
|
||||||
|
@@ -31,7 +31,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func newPod(uid, name string) *api.Pod {
|
func newPod(uid, name string) *api.Pod {
|
||||||
@@ -94,7 +93,7 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
numPods := 20
|
numPods := 20
|
||||||
for i := 0; i < numPods; i++ {
|
for i := 0; i < numPods; i++ {
|
||||||
for j := i; j < numPods; j++ {
|
for j := i; j < numPods; j++ {
|
||||||
podWorkers.UpdatePod(newPod(string(j), string(i)), nil, func() {})
|
podWorkers.UpdatePod(newPod(string(j), string(i)), nil, SyncPodCreate, func() {})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
drainWorkers(podWorkers, numPods)
|
drainWorkers(podWorkers, numPods)
|
||||||
@@ -122,44 +121,12 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateType(t *testing.T) {
|
|
||||||
syncType := make(chan SyncPodType)
|
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
|
||||||
podWorkers := newPodWorkers(
|
|
||||||
createFakeRuntimeCache(fakeRecorder),
|
|
||||||
func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
|
|
||||||
func() {
|
|
||||||
syncType <- updateType
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
fakeRecorder,
|
|
||||||
)
|
|
||||||
cases := map[*api.Pod][]SyncPodType{
|
|
||||||
newPod("u1", "n1"): {SyncPodCreate, SyncPodUpdate},
|
|
||||||
newPod("u2", "n1"): {SyncPodCreate},
|
|
||||||
}
|
|
||||||
for p, expectedTypes := range cases {
|
|
||||||
for i := range expectedTypes {
|
|
||||||
podWorkers.UpdatePod(p, nil, func() {})
|
|
||||||
select {
|
|
||||||
case gotType := <-syncType:
|
|
||||||
if gotType != expectedTypes[i] {
|
|
||||||
t.Fatalf("Expected sync type %v got %v for pod with uid %v", expectedTypes[i], gotType, p.UID)
|
|
||||||
}
|
|
||||||
case <-time.After(util.ForeverTestTimeout):
|
|
||||||
t.Errorf("Unexpected delay is running pod worker")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestForgetNonExistingPodWorkers(t *testing.T) {
|
func TestForgetNonExistingPodWorkers(t *testing.T) {
|
||||||
podWorkers, _ := createPodWorkers()
|
podWorkers, _ := createPodWorkers()
|
||||||
|
|
||||||
numPods := 20
|
numPods := 20
|
||||||
for i := 0; i < numPods; i++ {
|
for i := 0; i < numPods; i++ {
|
||||||
podWorkers.UpdatePod(newPod(string(i), "name"), nil, func() {})
|
podWorkers.UpdatePod(newPod(string(i), "name"), nil, SyncPodUpdate, func() {})
|
||||||
}
|
}
|
||||||
drainWorkers(podWorkers, numPods)
|
drainWorkers(podWorkers, numPods)
|
||||||
|
|
||||||
@@ -386,8 +353,8 @@ func TestFakePodWorkers(t *testing.T) {
|
|||||||
kubeletForRealWorkers.wg.Add(1)
|
kubeletForRealWorkers.wg.Add(1)
|
||||||
|
|
||||||
fakeDocker.ContainerList = tt.containerList
|
fakeDocker.ContainerList = tt.containerList
|
||||||
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {})
|
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, SyncPodUpdate, func() {})
|
||||||
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {})
|
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, SyncPodUpdate, func() {})
|
||||||
|
|
||||||
kubeletForRealWorkers.wg.Wait()
|
kubeletForRealWorkers.wg.Wait()
|
||||||
|
|
||||||
|
@@ -88,3 +88,25 @@ func GetValidatedSources(sources []string) ([]string, error) {
|
|||||||
}
|
}
|
||||||
return validated, nil
|
return validated, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SyncPodType classifies pod updates, eg: create, update.
|
||||||
|
type SyncPodType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
SyncPodSync SyncPodType = iota
|
||||||
|
SyncPodUpdate
|
||||||
|
SyncPodCreate
|
||||||
|
)
|
||||||
|
|
||||||
|
func (sp SyncPodType) String() string {
|
||||||
|
switch sp {
|
||||||
|
case SyncPodCreate:
|
||||||
|
return "create"
|
||||||
|
case SyncPodUpdate:
|
||||||
|
return "update"
|
||||||
|
case SyncPodSync:
|
||||||
|
return "sync"
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user