diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index 0ff21c914a8..0fe591abe74 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -11,6 +11,7 @@ go_library( srcs = [ "doc.go", "endpoints_controller.go", + "trigger_time_tracker.go", ], importpath = "k8s.io/kubernetes/pkg/controller/endpoint", deps = [ @@ -39,7 +40,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["endpoints_controller_test.go"], + srcs = [ + "endpoints_controller_test.go", + "trigger_time_tracker_test.go", + ], embed = [":go_default_library"], deps = [ "//pkg/api/testapi:go_default_library", diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index a92d698330d..b935f78364b 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -101,6 +101,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.endpointsLister = endpointsInformer.Lister() e.endpointsSynced = endpointsInformer.Informer().HasSynced + e.triggerTimeTracker = NewTriggerTimeTracker() + return e } @@ -138,6 +140,10 @@ type EndpointController struct { // workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes. workerLoopPeriod time.Duration + + // triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime + // annotation. + triggerTimeTracker *TriggerTimeTracker } // Run will not return until stopCh is closed. workers determines how many @@ -399,6 +405,7 @@ func (e *EndpointController) syncService(key string) error { if err != nil && !errors.IsNotFound(err) { return err } + e.triggerTimeTracker.DeleteEndpoints(namespace, name) return nil } @@ -427,6 +434,12 @@ func (e *EndpointController) syncService(key string) error { } } + // We call ComputeEndpointsLastChangeTriggerTime here to make sure that the state of the trigger + // time tracker gets updated even if the sync turns out to be no-op and we don't update the + // endpoints object. + endpointsLastChangeTriggerTime := e.triggerTimeTracker. + ComputeEndpointsLastChangeTriggerTime(namespace, name, service, pods) + subsets := []v1.EndpointSubset{} var totalReadyEps int var totalNotReadyEps int @@ -506,6 +519,11 @@ func (e *EndpointController) syncService(key string) error { newEndpoints.Annotations = make(map[string]string) } + if !endpointsLastChangeTriggerTime.IsZero() { + newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] = + endpointsLastChangeTriggerTime.Format(time.RFC3339Nano) + } + klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps) if createEndpoints { // No previous endpoints, create them diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index b86e546eebb..6818903e9cf 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -44,6 +44,9 @@ import ( var alwaysReady = func() bool { return true } var neverReady = func() bool { return false } var emptyNodeName string +var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC) +var triggerTimeString = triggerTime.Format(time.RFC3339Nano) +var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano) func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { for i := 0; i < nPods+nNotReady; i++ { @@ -1175,3 +1178,94 @@ func TestDetermineNeededServiceUpdates(t *testing.T) { } } } + +func TestLastTriggerChangeTimeAnnotation(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, + }}, + }) + addPods(endpoints.podStore, ns, 1, 1, 0) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{}, + Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}}, + }, + }) + endpoints.syncService(ns + "/foo") + + endpointsHandler.ValidateRequestCount(t, 1) + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Annotations: map[string]string{ + v1.EndpointsLastChangeTriggerTime: triggerTimeString, + }, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) +} + +func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Annotations: map[string]string{ + v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString, + }, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, + }}, + }) + addPods(endpoints.podStore, ns, 1, 1, 0) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{}, + Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}}, + }, + }) + endpoints.syncService(ns + "/foo") + + endpointsHandler.ValidateRequestCount(t, 1) + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Annotations: map[string]string{ + v1.EndpointsLastChangeTriggerTime: triggerTimeString, + }, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) +} diff --git a/pkg/controller/endpoint/trigger_time_tracker.go b/pkg/controller/endpoint/trigger_time_tracker.go new file mode 100644 index 00000000000..0b2c72a9b7b --- /dev/null +++ b/pkg/controller/endpoint/trigger_time_tracker.go @@ -0,0 +1,163 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "sync" + "time" + + "k8s.io/api/core/v1" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" +) + +// TriggerTimeTracker is a util used to compute the EndpointsLastChangeTriggerTime annotation which +// is exported in the endpoints controller's sync function. +// See the documentation of the EndpointsLastChangeTriggerTime annotation for more details. +// +// Please note that this util may compute a wrong EndpointsLastChangeTriggerTime if a same object +// changes multiple times between two consecutive syncs. We're aware of this limitation but we +// decided to accept it, as fixing it would require a major rewrite of the endpoints controller and +// Informer framework. Such situations, i.e. frequent updates of the same object in a single sync +// period, should be relatively rare and therefore this util should provide a good approximation of +// the EndpointsLastChangeTriggerTime. +// TODO(mm4tt): Implement a more robust mechanism that is not subject to the above limitations. +type TriggerTimeTracker struct { + // endpointsStates is a map, indexed by Endpoints object key, storing the last known Endpoints + // object state observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime + // function. + endpointsStates map[endpointsKey]endpointsState + + // mutex guarding the endpointsStates map. + mutex sync.Mutex +} + +// NewTriggerTimeTracker creates a new instance of the TriggerTimeTracker. +func NewTriggerTimeTracker() *TriggerTimeTracker { + return &TriggerTimeTracker{ + endpointsStates: make(map[endpointsKey]endpointsState), + } +} + +// endpointsKey is a key uniquely identifying an Endpoints object. +type endpointsKey struct { + // namespace, name composing a namespaced name - an unique identifier of every Endpoints object. + namespace, name string +} + +// endpointsState represents a state of an Endpoints object that is known to this util. +type endpointsState struct { + // lastServiceTriggerTime is a service trigger time observed most recently. + lastServiceTriggerTime time.Time + // lastPodTriggerTimes is a map (Pod name -> time) storing the pod trigger times that were + // observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime function. + lastPodTriggerTimes map[string]time.Time +} + +// ComputeEndpointsLastChangeTriggerTime updates the state of the Endpoints object being synced +// and returns the time that should be exported as the EndpointsLastChangeTriggerTime annotation. +// +// If the method returns a 'zero' time the EndpointsLastChangeTriggerTime annotation shouldn't be +// exported. +// +// Please note that this function may compute a wrong EndpointsLastChangeTriggerTime value if the +// same object (pod/service) changes multiple times between two consecutive syncs. +// +// Important: This method is go-routing safe but only when called for different keys. The method +// shouldn't be called concurrently for the same key! This contract is fulfilled in the current +// implementation of the endpoints controller. +func (t *TriggerTimeTracker) ComputeEndpointsLastChangeTriggerTime( + namespace, name string, service *v1.Service, pods []*v1.Pod) time.Time { + + key := endpointsKey{namespace: namespace, name: name} + // As there won't be any concurrent calls for the same key, we need to guard access only to the + // endpointsStates map. + t.mutex.Lock() + state, wasKnown := t.endpointsStates[key] + t.mutex.Unlock() + + // Update the state before returning. + defer func() { + t.mutex.Lock() + t.endpointsStates[key] = state + t.mutex.Unlock() + }() + + // minChangedTriggerTime is the min trigger time of all trigger times that have changed since the + // last sync. + var minChangedTriggerTime time.Time + // TODO(mm4tt): If memory allocation / GC performance impact of recreating map in every call + // turns out to be too expensive, we should consider rewriting this to reuse the existing map. + podTriggerTimes := make(map[string]time.Time) + for _, pod := range pods { + if podTriggerTime := getPodTriggerTime(pod); !podTriggerTime.IsZero() { + podTriggerTimes[pod.Name] = podTriggerTime + if podTriggerTime.After(state.lastPodTriggerTimes[pod.Name]) { + // Pod trigger time has changed since the last sync, update minChangedTriggerTime. + minChangedTriggerTime = min(minChangedTriggerTime, podTriggerTime) + } + } + } + serviceTriggerTime := getServiceTriggerTime(service) + if serviceTriggerTime.After(state.lastServiceTriggerTime) { + // Service trigger time has changed since the last sync, update minChangedTriggerTime. + minChangedTriggerTime = min(minChangedTriggerTime, serviceTriggerTime) + } + + state.lastPodTriggerTimes = podTriggerTimes + state.lastServiceTriggerTime = serviceTriggerTime + + if !wasKnown { + // New Endpoints object / new Service, use Service creationTimestamp. + return service.CreationTimestamp.Time + } else { + // Regular update of the Endpoints object, return min of changed trigger times. + return minChangedTriggerTime + } +} + +// DeleteEndpoints deletes endpoints state stored in this util. +func (t *TriggerTimeTracker) DeleteEndpoints(namespace, name string) { + key := endpointsKey{namespace: namespace, name: name} + t.mutex.Lock() + defer t.mutex.Unlock() + delete(t.endpointsStates, key) +} + +// getPodTriggerTime returns the time of the pod change (trigger) that resulted or will result in +// the endpoints object change. +func getPodTriggerTime(pod *v1.Pod) (triggerTime time.Time) { + if readyCondition := podutil.GetPodReadyCondition(pod.Status); readyCondition != nil { + triggerTime = readyCondition.LastTransitionTime.Time + } + // TODO(mm4tt): Implement missing cases: deletionTime set, pod label change + return triggerTime +} + +// getServiceTriggerTime returns the time of the service change (trigger) that resulted or will +// result in the endpoints object change. +func getServiceTriggerTime(service *v1.Service) (triggerTime time.Time) { + // TODO(mm4tt): Ideally we should look at service.LastUpdateTime, but such thing doesn't exist. + return service.CreationTimestamp.Time +} + +// min returns minimum of the currentMin and newValue or newValue if the currentMin is not set. +func min(currentMin, newValue time.Time) time.Time { + if currentMin.IsZero() || newValue.Before(currentMin) { + return newValue + } + return currentMin +} diff --git a/pkg/controller/endpoint/trigger_time_tracker_test.go b/pkg/controller/endpoint/trigger_time_tracker_test.go new file mode 100644 index 00000000000..b8ebd68f0a5 --- /dev/null +++ b/pkg/controller/endpoint/trigger_time_tracker_test.go @@ -0,0 +1,204 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "runtime" + "testing" + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +var ( + t0 = time.Date(2019, 01, 01, 0, 0, 0, 0, time.UTC) + t1 = t0.Add(time.Second) + t2 = t1.Add(time.Second) + t3 = t2.Add(time.Second) + t4 = t3.Add(time.Second) + t5 = t4.Add(time.Second) + + ns = "ns1" + name = "my-service" +) + +func TestNewService_NoPods(t *testing.T) { + tester := newTester(t) + + service := createService(ns, name, t2) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service).expect(t2) +} + +func TestNewService_ExistingPods(t *testing.T) { + tester := newTester(t) + + service := createService(ns, name, t3) + pod1 := createPod(ns, "pod1", t0) + pod2 := createPod(ns, "pod2", t1) + pod3 := createPod(ns, "pod3", t5) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2, pod3). + // Pods were created before service, but trigger time is the time when service was created. + expect(t3) +} + +func TestPodsAdded(t *testing.T) { + tester := newTester(t) + + service := createService(ns, name, t0) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service).expect(t0) + + pod1 := createPod(ns, "pod1", t2) + pod2 := createPod(ns, "pod2", t1) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2).expect(t1) +} + +func TestPodsUpdated(t *testing.T) { + tester := newTester(t) + + service := createService(ns, name, t0) + pod1 := createPod(ns, "pod1", t1) + pod2 := createPod(ns, "pod2", t2) + pod3 := createPod(ns, "pod3", t3) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2, pod3).expect(t0) + + pod1 = createPod(ns, "pod1", t5) + pod2 = createPod(ns, "pod2", t4) + // pod3 doesn't change. + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2, pod3).expect(t4) +} + +func TestPodsUpdated_NoOp(t *testing.T) { + tester := newTester(t) + + service := createService(ns, name, t0) + pod1 := createPod(ns, "pod1", t1) + pod2 := createPod(ns, "pod2", t2) + pod3 := createPod(ns, "pod3", t3) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2, pod3).expect(t0) + + // Nothing has changed. + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2, pod3).expectNil() +} + +func TestPodDeletedThenAdded(t *testing.T) { + tester := newTester(t) + + service := createService(ns, name, t0) + pod1 := createPod(ns, "pod1", t1) + pod2 := createPod(ns, "pod2", t2) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2).expect(t0) + + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1).expectNil() + + pod2 = createPod(ns, "pod2", t4) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2).expect(t4) +} + +func TestServiceDeletedThenAdded(t *testing.T) { + tester := newTester(t) + + service := createService(ns, name, t0) + pod1 := createPod(ns, "pod1", t1) + pod2 := createPod(ns, "pod2", t2) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2).expect(t0) + + tester.DeleteEndpoints(ns, name) + + service = createService(ns, name, t3) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2).expect(t3) +} + +func TestServiceUpdated_NoPodChange(t *testing.T) { + tester := newTester(t) + + service := createService(ns, name, t0) + pod1 := createPod(ns, "pod1", t1) + pod2 := createPod(ns, "pod2", t2) + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2).expect(t0) + + // service's ports have changed. + service.Spec = v1.ServiceSpec{ + Selector: map[string]string{}, + Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}}, + } + + // Currently we're not able to calculate trigger time for service updates, hence the returned + // value is a nil time. + tester.whenComputeEndpointsLastChangeTriggerTime(ns, name, service, pod1, pod2).expectNil() +} + +// ------- Test Utils ------- + +type tester struct { + *TriggerTimeTracker + t *testing.T +} + +func newTester(t *testing.T) *tester { + return &tester{NewTriggerTimeTracker(), t} +} + +func (t *tester) whenComputeEndpointsLastChangeTriggerTime( + namespace, name string, service *v1.Service, pods ...*v1.Pod) subject { + return subject{t.ComputeEndpointsLastChangeTriggerTime(namespace, name, service, pods), t.t} +} + +type subject struct { + got time.Time + t *testing.T +} + +func (s subject) expect(expected time.Time) { + s.doExpect(expected) +} + +func (s subject) expectNil() { + s.doExpect(time.Time{}) +} + +func (s subject) doExpect(expected time.Time) { + if s.got != expected { + _, fn, line, _ := runtime.Caller(2) + s.t.Errorf("Wrong trigger time in %s:%d expected %s, got %s", fn, line, expected, s.got) + } +} + +func createPod(namespace, name string, readyTime time.Time) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}, + Status: v1.PodStatus{Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.NewTime(readyTime), + }, + }, + }, + } +} + +func createService(namespace, name string, creationTime time.Time) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + CreationTimestamp: metav1.NewTime(creationTime), + }, + } +}