From a1f07049e558a957c6692c9b1a749109be451560 Mon Sep 17 00:00:00 2001 From: Zihong Zheng Date: Wed, 15 May 2019 21:40:04 -0700 Subject: [PATCH] Patch service instead of update in service controller Co-authored-by: Josh Horwitz --- pkg/controller/service/BUILD | 8 +- pkg/controller/service/patch.go | 63 ++++++++++++ pkg/controller/service/patch_test.go | 101 +++++++++++++++++++ pkg/controller/service/service_controller.go | 50 ++------- 4 files changed, 179 insertions(+), 43 deletions(-) create mode 100644 pkg/controller/service/patch.go create mode 100644 pkg/controller/service/patch_test.go diff --git a/pkg/controller/service/BUILD b/pkg/controller/service/BUILD index e679bad8ec2..98b9daf21ea 100644 --- a/pkg/controller/service/BUILD +++ b/pkg/controller/service/BUILD @@ -10,6 +10,7 @@ go_library( name = "go_default_library", srcs = [ "doc.go", + "patch.go", "service_controller.go", ], importpath = "k8s.io/kubernetes/pkg/controller/service", @@ -20,8 +21,10 @@ go_library( "//pkg/util/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", @@ -39,7 +42,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["service_controller_test.go"], + srcs = [ + "patch_test.go", + "service_controller_test.go", + ], embed = [":go_default_library"], deps = [ "//pkg/api/testapi:go_default_library", diff --git a/pkg/controller/service/patch.go b/pkg/controller/service/patch.go new file mode 100644 index 00000000000..1364317d182 --- /dev/null +++ b/pkg/controller/service/patch.go @@ -0,0 +1,63 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "encoding/json" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" +) + +// patch patches service's Status or ObjectMeta given the origin and +// updated ones. Change to spec will be ignored. +func patch(c v1core.CoreV1Interface, oldSvc *v1.Service, newSvc *v1.Service) (*v1.Service, error) { + // Reset spec to make sure only patch for Status or ObjectMeta. + newSvc.Spec = oldSvc.Spec + + patchBytes, err := getPatchBytes(oldSvc, newSvc) + if err != nil { + return nil, err + } + + updatedSvc, err := c.Services(oldSvc.Namespace).Patch(oldSvc.Name, types.StrategicMergePatchType, patchBytes, "status") + if err != nil { + return nil, fmt.Errorf("failed to patch %q for svc %s/%s: %v", patchBytes, oldSvc.Namespace, oldSvc.Name, err) + } + return updatedSvc, nil +} + +func getPatchBytes(oldSvc *v1.Service, newSvc *v1.Service) ([]byte, error) { + oldData, err := json.Marshal(oldSvc) + if err != nil { + return nil, fmt.Errorf("failed to Marshal oldData for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err) + } + + newData, err := json.Marshal(newSvc) + if err != nil { + return nil, fmt.Errorf("failed to Marshal newData for svc %s/%s: %v", newSvc.Namespace, newSvc.Name, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Service{}) + if err != nil { + return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err) + } + return patchBytes, nil +} diff --git a/pkg/controller/service/patch_test.go b/pkg/controller/service/patch_test.go new file mode 100644 index 00000000000..df3e3ae175f --- /dev/null +++ b/pkg/controller/service/patch_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "reflect" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func addAnnotations(svc *v1.Service) { + svc.Annotations["foo"] = "bar" +} + +func TestPatch(t *testing.T) { + svcOrigin := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-patch", + Annotations: map[string]string{}, + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + }, + } + fakeCs := fake.NewSimpleClientset(svcOrigin) + + // Issue a separate update and verify patch doesn't fail after this. + svcToUpdate := svcOrigin.DeepCopy() + addAnnotations(svcToUpdate) + if _, err := fakeCs.CoreV1().Services(svcOrigin.Namespace).Update(svcToUpdate); err != nil { + t.Fatalf("Failed to update service: %v", err) + } + + // Attempt to patch based the original service. + svcToPatch := svcOrigin.DeepCopy() + svcToPatch.Finalizers = []string{"foo"} + svcToPatch.Spec.ClusterIP = "10.0.0.2" + svcToPatch.Status = v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + {IP: "8.8.8.8"}, + }, + }, + } + svcPatched, err := patch(fakeCs.CoreV1(), svcOrigin, svcToPatch) + if err != nil { + t.Fatalf("Failed to patch service: %v", err) + } + + // Service returned by patch will contain latest content (e.g from + // the separate update). + addAnnotations(svcToPatch) + if !reflect.DeepEqual(svcPatched, svcToPatch) { + t.Errorf("PatchStatus() = %+v, want %+v", svcPatched, svcToPatch) + } + // Explicitly validate if spec is unchanged from origin. + if !reflect.DeepEqual(svcPatched.Spec, svcOrigin.Spec) { + t.Errorf("Got spec = %+v, want %+v", svcPatched.Spec, svcOrigin.Spec) + } +} + +func TestGetPatchBytes(t *testing.T) { + origin := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-patch-bytes", + Finalizers: []string{"foo"}, + }, + } + updated := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-patch-bytes", + Finalizers: []string{"foo", "bar"}, + }, + } + + b, err := getPatchBytes(origin, updated) + if err != nil { + t.Fatal(err) + } + expected := `{"metadata":{"$setElementOrder/finalizers":["foo","bar"],"finalizers":["bar"]}}` + if string(b) != expected { + t.Errorf("getPatchBytes(%+v, %+v) = %s ; want %s", origin, updated, string(b), expected) + } +} diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index 4da3f8027be..68fe82de0f6 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -24,7 +24,7 @@ import ( "reflect" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -309,57 +309,23 @@ func (s *ServiceController) syncLoadBalancerIfNeeded(key string, service *v1.Ser s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer") } - // Write the state if changed - // TODO: Be careful here ... what if there were other changes to the service? + // If there are any changes to the status then patch the service. if !v1helper.LoadBalancerStatusEqual(previousState, newState) { // Make a copy so we don't mutate the shared informer cache - service = service.DeepCopy() + updated := service.DeepCopy() + updated.Status.LoadBalancer = *newState - // Update the status on the copy - service.Status.LoadBalancer = *newState - - if err := s.persistUpdate(service); err != nil { - // TODO: This logic needs to be revisited. We might want to retry on all the errors, not just conflicts. - if errors.IsConflict(err) { - return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v", service.Namespace, service.Name, err) - } - runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err)) - return nil + if _, err := patch(s.kubeClient.CoreV1(), service, updated); err != nil { + return fmt.Errorf("failed to patch status for service %s: %v", key, err) } + klog.V(4).Infof("Successfully patched status for service %s", key) } else { - klog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key) + klog.V(4).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key) } return nil } -func (s *ServiceController) persistUpdate(service *v1.Service) error { - var err error - for i := 0; i < clientRetryCount; i++ { - _, err = s.kubeClient.CoreV1().Services(service.Namespace).UpdateStatus(service) - if err == nil { - return nil - } - // If the object no longer exists, we don't want to recreate it. Just bail - // out so that we can process the delete, which we should soon be receiving - // if we haven't already. - if errors.IsNotFound(err) { - klog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v", - service.Namespace, service.Name, err) - return nil - } - // TODO: Try to resolve the conflict if the change was unrelated to load - // balancer status. For now, just pass it up the stack. - if errors.IsConflict(err) { - return err - } - klog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v", - service.Namespace, service.Name, err) - time.Sleep(clientRetryInterval) - } - return err -} - func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) { nodes, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) if err != nil {