mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 00:07:50 +00:00
commit
32d844e59c
@ -31,7 +31,6 @@ import (
|
|||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/controller/framework"
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
)
|
)
|
||||||
@ -433,39 +432,6 @@ func FilterActivePods(pods []api.Pod) []*api.Pod {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncAllPodsWithStore lists all pods and inserts them into the given store.
|
|
||||||
// Though this function is written in a generic manner, it is only used by the
|
|
||||||
// controllers for a specific purpose, to synchronously populate the store
|
|
||||||
// with the first batch of pods that would otherwise be sent by the Informer.
|
|
||||||
// Doing this avoids more complicated forms of synchronization with the
|
|
||||||
// Informer, though it also means that the controller calling this function
|
|
||||||
// will receive "OnUpdate" events for all the pods in the store, instead of
|
|
||||||
// "OnAdd". This should be ok, since most controllers are level triggered
|
|
||||||
// and make decisions based on the contents of the store.
|
|
||||||
//
|
|
||||||
// TODO: Extend this logic to load arbitrary local state for the controllers
|
|
||||||
// instead of just pods.
|
|
||||||
func SyncAllPodsWithStore(kubeClient clientset.Interface, store cache.Store) {
|
|
||||||
var allPods *api.PodList
|
|
||||||
var err error
|
|
||||||
listOptions := api.ListOptions{LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}
|
|
||||||
for {
|
|
||||||
if allPods, err = kubeClient.Core().Pods(api.NamespaceAll).List(listOptions); err != nil {
|
|
||||||
glog.Warningf("Retrying pod list: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
pods := []interface{}{}
|
|
||||||
for i := range allPods.Items {
|
|
||||||
p := allPods.Items[i]
|
|
||||||
glog.V(4).Infof("Initializing store with pod %v/%v", p.Namespace, p.Name)
|
|
||||||
pods = append(pods, &p)
|
|
||||||
}
|
|
||||||
store.Replace(pods, allPods.ResourceVersion)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker.
|
// ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker.
|
||||||
type ControllersByCreationTimestamp []*api.ReplicationController
|
type ControllersByCreationTimestamp []*api.ReplicationController
|
||||||
|
|
||||||
|
@ -187,7 +187,6 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro
|
|||||||
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
|
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
glog.Infof("Starting Daemon Sets controller manager")
|
glog.Infof("Starting Daemon Sets controller manager")
|
||||||
controller.SyncAllPodsWithStore(dsc.kubeClient, dsc.podStore.Store)
|
|
||||||
go dsc.dsController.Run(stopCh)
|
go dsc.dsController.Run(stopCh)
|
||||||
go dsc.podController.Run(stopCh)
|
go dsc.podController.Run(stopCh)
|
||||||
go dsc.nodeController.Run(stopCh)
|
go dsc.nodeController.Run(stopCh)
|
||||||
@ -472,6 +471,7 @@ func storeDaemonSetStatus(dsClient unversioned_extensions.DaemonSetInterface, ds
|
|||||||
if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled {
|
if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var updateErr, getErr error
|
var updateErr, getErr error
|
||||||
for i := 0; i <= StatusUpdateRetries; i++ {
|
for i := 0; i <= StatusUpdateRetries; i++ {
|
||||||
ds.Status.DesiredNumberScheduled = desiredNumberScheduled
|
ds.Status.DesiredNumberScheduled = desiredNumberScheduled
|
||||||
|
@ -18,7 +18,6 @@ package daemon
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http/httptest"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -30,9 +29,7 @@ import (
|
|||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
|
||||||
"k8s.io/kubernetes/pkg/securitycontext"
|
"k8s.io/kubernetes/pkg/securitycontext"
|
||||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -457,39 +454,3 @@ func TestDSManagerNotReady(t *testing.T) {
|
|||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
|
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDSManagerInit(t *testing.T) {
|
|
||||||
// Insert a stable daemon set and make sure we don't create an extra pod
|
|
||||||
// for the one node which already has a daemon after a simulated restart.
|
|
||||||
ds := newDaemonSet("test")
|
|
||||||
ds.Status = extensions.DaemonSetStatus{
|
|
||||||
CurrentNumberScheduled: 1,
|
|
||||||
NumberMisscheduled: 0,
|
|
||||||
DesiredNumberScheduled: 1,
|
|
||||||
}
|
|
||||||
nodeName := "only-node"
|
|
||||||
podList := &api.PodList{
|
|
||||||
Items: []api.Pod{
|
|
||||||
*newPod("podname", nodeName, simpleDaemonSetLabel),
|
|
||||||
}}
|
|
||||||
response := runtime.EncodeOrDie(testapi.Default.Codec(), podList)
|
|
||||||
fakeHandler := utiltesting.FakeHandler{
|
|
||||||
StatusCode: 200,
|
|
||||||
ResponseBody: response,
|
|
||||||
}
|
|
||||||
testServer := httptest.NewServer(&fakeHandler)
|
|
||||||
// TODO: Uncomment when fix #19254
|
|
||||||
// defer testServer.Close()
|
|
||||||
|
|
||||||
clientset := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
|
||||||
manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc)
|
|
||||||
manager.dsStore.Add(ds)
|
|
||||||
manager.nodeStore.Add(newNode(nodeName, nil))
|
|
||||||
manager.podStoreSynced = alwaysReady
|
|
||||||
controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store)
|
|
||||||
|
|
||||||
fakePodControl := &controller.FakePodControl{}
|
|
||||||
manager.podControl = fakePodControl
|
|
||||||
manager.syncHandler(getKey(ds, t))
|
|
||||||
validateSyncDaemonSets(t, fakePodControl, 0, 0)
|
|
||||||
}
|
|
||||||
|
@ -187,7 +187,6 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
|
|||||||
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
|
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
glog.Infof("Starting RC Manager")
|
glog.Infof("Starting RC Manager")
|
||||||
controller.SyncAllPodsWithStore(rm.kubeClient, rm.podStore.Store)
|
|
||||||
go rm.rcController.Run(stopCh)
|
go rm.rcController.Run(stopCh)
|
||||||
go rm.podController.Run(stopCh)
|
go rm.podController.Run(stopCh)
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
|
@ -892,27 +892,3 @@ func TestOverlappingRCs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRCManagerInit(t *testing.T) {
|
|
||||||
// Insert a stable rc into the replication manager's store and make sure
|
|
||||||
// it syncs pods with the apiserver before making any decisions.
|
|
||||||
rc := newReplicationController(2)
|
|
||||||
response := runtime.EncodeOrDie(testapi.Default.Codec(), newPodList(nil, 2, api.PodRunning, rc))
|
|
||||||
fakeHandler := utiltesting.FakeHandler{
|
|
||||||
StatusCode: 200,
|
|
||||||
ResponseBody: response,
|
|
||||||
}
|
|
||||||
testServer := httptest.NewServer(&fakeHandler)
|
|
||||||
// TODO: Uncomment when fix #19254
|
|
||||||
// defer testServer.Close()
|
|
||||||
|
|
||||||
c := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
|
||||||
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
|
|
||||||
manager.rcStore.Store.Add(rc)
|
|
||||||
manager.podStoreSynced = alwaysReady
|
|
||||||
controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store)
|
|
||||||
fakePodControl := &controller.FakePodControl{}
|
|
||||||
manager.podControl = fakePodControl
|
|
||||||
manager.syncReplicationController(getKey(rc, t))
|
|
||||||
validateSyncReplication(t, fakePodControl, 0, 0)
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user