mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
EndpointSliceTracker should track updated resource version
During EndpointSlice reconcilation, EndpointSliceTracker is supposed to track expected EndpointSlice resource versions so that external changes to them can be detected. But it actually tracked the stale resource version and resulted in every Service was handled twice as it always received an EndpointSlice update with a different resource version but was actually created/updated by itself during the first processing.
This commit is contained in:
parent
acd97b42f3
commit
c2d3e54551
@ -206,7 +206,7 @@ func (r *reconciler) finalize(
|
||||
|
||||
for _, endpointSlice := range slicesToCreate {
|
||||
addTriggerTimeAnnotation(endpointSlice, triggerTime)
|
||||
_, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
createdSlice, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
// If the namespace is terminating, creates will continue to fail. Simply drop the item.
|
||||
if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
|
||||
@ -214,18 +214,18 @@ func (r *reconciler) finalize(
|
||||
}
|
||||
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err))
|
||||
} else {
|
||||
r.endpointSliceTracker.Update(endpointSlice)
|
||||
r.endpointSliceTracker.Update(createdSlice)
|
||||
metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
|
||||
}
|
||||
}
|
||||
|
||||
for _, endpointSlice := range slicesToUpdate {
|
||||
addTriggerTimeAnnotation(endpointSlice, triggerTime)
|
||||
_, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
|
||||
updatedSlice, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
|
||||
} else {
|
||||
r.endpointSliceTracker.Update(endpointSlice)
|
||||
r.endpointSliceTracker.Update(updatedSlice)
|
||||
metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
|
||||
}
|
||||
}
|
||||
|
@ -62,6 +62,7 @@ func TestReconcileEmpty(t *testing.T) {
|
||||
assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
|
||||
assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
|
||||
assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
|
||||
expectTrackedResourceVersion(t, r.endpointSliceTracker, &slices[0], "100")
|
||||
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
|
||||
}
|
||||
|
||||
@ -190,6 +191,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
t.Errorf("Expected endpoint: %+v, got: %+v", testCase.expectedEndpoint, endpoint)
|
||||
}
|
||||
|
||||
expectTrackedResourceVersion(t, r.endpointSliceTracker, &slice, "100")
|
||||
|
||||
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
|
||||
})
|
||||
}
|
||||
@ -221,6 +224,7 @@ func TestReconcile1EndpointSlice(t *testing.T) {
|
||||
assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
|
||||
assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
|
||||
assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
|
||||
expectTrackedResourceVersion(t, r.endpointSliceTracker, &slices[0], "200")
|
||||
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0})
|
||||
}
|
||||
|
||||
@ -821,6 +825,17 @@ func expectActions(t *testing.T, actions []k8stesting.Action, num int, verb, res
|
||||
}
|
||||
}
|
||||
|
||||
func expectTrackedResourceVersion(t *testing.T, tracker *endpointSliceTracker, slice *discovery.EndpointSlice, expectedRV string) {
|
||||
rrv := tracker.relatedResourceVersions(slice)
|
||||
rv, tracked := rrv[slice.Name]
|
||||
if !tracked {
|
||||
t.Fatalf("Expected EndpointSlice %s to be tracked", slice.Name)
|
||||
}
|
||||
if rv != expectedRV {
|
||||
t.Errorf("Expected ResourceVersion of %s to be %s, got %s", slice.Name, expectedRV, rv)
|
||||
}
|
||||
}
|
||||
|
||||
func portsAndAddressTypeEqual(slice1, slice2 discovery.EndpointSlice) bool {
|
||||
return apiequality.Semantic.DeepEqual(slice1.Ports, slice2.Ports) && apiequality.Semantic.DeepEqual(slice1.AddressType, slice2.AddressType)
|
||||
}
|
||||
|
@ -401,9 +401,15 @@ func newClientset() *fake.Clientset {
|
||||
endpointSlice.ObjectMeta.Name = fmt.Sprintf("%s-%s", endpointSlice.ObjectMeta.GenerateName, rand.String(8))
|
||||
endpointSlice.ObjectMeta.GenerateName = ""
|
||||
}
|
||||
endpointSlice.ObjectMeta.ResourceVersion = "100"
|
||||
|
||||
return false, endpointSlice, nil
|
||||
}))
|
||||
client.PrependReactor("update", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
|
||||
endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
|
||||
endpointSlice.ObjectMeta.ResourceVersion = "200"
|
||||
return false, endpointSlice, nil
|
||||
}))
|
||||
|
||||
return client
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user