mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #23035 from xinxiaogang/xnxin-master
Auto commit by PR queue bot
This commit is contained in:
commit
cda4583984
@ -47,6 +47,10 @@ const (
|
||||
// often. Higher numbers = lower CPU/network load; lower numbers =
|
||||
// shorter amount of time before a mistaken endpoint is corrected.
|
||||
FullServiceResyncPeriod = 30 * time.Second
|
||||
|
||||
// We must avoid syncing service until the pod store has synced. If it hasn't synced, to
|
||||
// avoid a hot loop, we'll wait this long between checks.
|
||||
PodStoreSyncedPollPeriod = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
var (
|
||||
@ -98,6 +102,7 @@ func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.
|
||||
DeleteFunc: e.deletePod,
|
||||
},
|
||||
)
|
||||
e.podStoreSynced = e.podController.HasSynced
|
||||
|
||||
return e
|
||||
}
|
||||
@ -120,6 +125,9 @@ type EndpointController struct {
|
||||
// controllers.
|
||||
serviceController *framework.Controller
|
||||
podController *framework.Controller
|
||||
// podStoreSynced returns true if the pod store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
podStoreSynced func() bool
|
||||
}
|
||||
|
||||
// Runs e; will not return until stopCh is closed. workers determines how many
|
||||
@ -268,6 +276,15 @@ func (e *EndpointController) syncService(key string) {
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
if !e.podStoreSynced() {
|
||||
// Sleep so we give the pod reflector goroutine a chance to run.
|
||||
time.Sleep(PodStoreSyncedPollPeriod)
|
||||
glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
|
||||
e.queue.Add(key)
|
||||
return
|
||||
}
|
||||
|
||||
obj, exists, err := e.serviceStore.Store.GetByKey(key)
|
||||
if err != nil || !exists {
|
||||
// Delete the corresponding endpoint, as the service has been deleted.
|
||||
|
@ -36,6 +36,8 @@ import (
|
||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
||||
)
|
||||
|
||||
var alwaysReady = func() bool { return true }
|
||||
|
||||
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
|
||||
for i := 0; i < nPods+nNotReady; i++ {
|
||||
p := &api.Pod{
|
||||
@ -107,6 +109,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
|
||||
@ -140,6 +143,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
endpoints.checkLeftoverEndpoints()
|
||||
|
||||
if e, a := 1, endpoints.queue.Len(); e != a {
|
||||
@ -169,6 +173,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
@ -211,6 +217,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
@ -250,6 +257,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
@ -288,6 +296,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, ns, 0, 1, 1)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
@ -326,6 +335,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1, 1)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
@ -368,6 +378,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
@ -409,6 +420,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
|
||||
@ -429,6 +441,7 @@ func TestSyncEndpointsItems(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
|
||||
addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found!
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
@ -472,6 +485,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
|
||||
serviceLabels := map[string]string{"foo": "bar"}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
@ -533,6 +547,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
|
||||
// defer testServer.Close()
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
|
||||
serviceLabels := map[string]string{"baz": "blah"}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
|
Loading…
Reference in New Issue
Block a user