mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Sync all pods with store before starting RC Manager.
This commit is contained in:
parent
7742c6c78e
commit
1b93ee7b35
@ -30,6 +30,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/controller/framework"
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
)
|
)
|
||||||
@ -412,11 +413,48 @@ func (s ActivePods) Less(i, j int) bool {
|
|||||||
func FilterActivePods(pods []api.Pod) []*api.Pod {
|
func FilterActivePods(pods []api.Pod) []*api.Pod {
|
||||||
var result []*api.Pod
|
var result []*api.Pod
|
||||||
for i := range pods {
|
for i := range pods {
|
||||||
if api.PodSucceeded != pods[i].Status.Phase &&
|
p := pods[i]
|
||||||
api.PodFailed != pods[i].Status.Phase &&
|
if api.PodSucceeded != p.Status.Phase &&
|
||||||
pods[i].DeletionTimestamp == nil {
|
api.PodFailed != p.Status.Phase &&
|
||||||
result = append(result, &pods[i])
|
p.DeletionTimestamp == nil {
|
||||||
|
result = append(result, &p)
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
|
||||||
|
p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SyncAllPodsWithStore lists all pods and inserts them into the given store.
|
||||||
|
// Though this function is written in a generic manner, it is only used by the
|
||||||
|
// controllers for a specific purpose, to synchronously populate the store
|
||||||
|
// with the first batch of pods that would otherwise be sent by the Informer.
|
||||||
|
// Doing this avoids more complicated forms of synchronization with the
|
||||||
|
// Informer, though it also means that the controller calling this function
|
||||||
|
// will receive "OnUpdate" events for all the pods in the store, instead of
|
||||||
|
// "OnAdd". This should be ok, since most controllers are level triggered
|
||||||
|
// and make decisions based on the contents of the store.
|
||||||
|
//
|
||||||
|
// TODO: Extend this logic to load arbitrary local state for the controllers
|
||||||
|
// instead of just pods.
|
||||||
|
func SyncAllPodsWithStore(kubeClient client.Interface, store cache.Store) {
|
||||||
|
var allPods *api.PodList
|
||||||
|
var err error
|
||||||
|
listOptions := api.ListOptions{LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}
|
||||||
|
for {
|
||||||
|
if allPods, err = kubeClient.Pods(api.NamespaceAll).List(listOptions); err != nil {
|
||||||
|
glog.Warningf("Retrying pod list: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
pods := []interface{}{}
|
||||||
|
for i := range allPods.Items {
|
||||||
|
p := allPods.Items[i]
|
||||||
|
glog.V(4).Infof("Initializing store with pod %v/%v", p.Namespace, p.Name)
|
||||||
|
pods = append(pods, &p)
|
||||||
|
}
|
||||||
|
store.Replace(pods, allPods.ResourceVersion)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
@ -182,6 +182,8 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
|
|||||||
// Run begins watching and syncing.
|
// Run begins watching and syncing.
|
||||||
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
|
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
|
glog.Infof("Starting RC Manager")
|
||||||
|
controller.SyncAllPodsWithStore(rm.kubeClient, rm.podStore.Store)
|
||||||
go rm.rcController.Run(stopCh)
|
go rm.rcController.Run(stopCh)
|
||||||
go rm.podController.Run(stopCh)
|
go rm.podController.Run(stopCh)
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
|
@ -894,3 +894,27 @@ func TestOverlappingRCs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRCManagerInit(t *testing.T) {
|
||||||
|
// Insert a stable rc into the replication manager's store and make sure
|
||||||
|
// it syncs pods with the apiserver before making any decisions.
|
||||||
|
rc := newReplicationController(2)
|
||||||
|
response := runtime.EncodeOrDie(testapi.Default.Codec(), newPodList(nil, 2, api.PodRunning, rc))
|
||||||
|
fakeHandler := utiltesting.FakeHandler{
|
||||||
|
StatusCode: 200,
|
||||||
|
ResponseBody: response,
|
||||||
|
}
|
||||||
|
testServer := httptest.NewServer(&fakeHandler)
|
||||||
|
// TODO: Uncomment when fix #19254
|
||||||
|
// defer testServer.Close()
|
||||||
|
|
||||||
|
client := client.NewOrDie(&client.Config{Host: testServer.URL, GroupVersion: testapi.Default.GroupVersion()})
|
||||||
|
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
|
||||||
|
manager.rcStore.Store.Add(rc)
|
||||||
|
manager.podStoreSynced = alwaysReady
|
||||||
|
controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store)
|
||||||
|
fakePodControl := &controller.FakePodControl{}
|
||||||
|
manager.podControl = fakePodControl
|
||||||
|
manager.syncReplicationController(getKey(rc, t))
|
||||||
|
validateSyncReplication(t, fakePodControl, 0, 0)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user