From a1f07049e558a957c6692c9b1a749109be451560 Mon Sep 17 00:00:00 2001 From: Zihong Zheng Date: Wed, 15 May 2019 21:40:04 -0700 Subject: [PATCH 1/3] Patch service instead of update in service controller Co-authored-by: Josh Horwitz --- pkg/controller/service/BUILD | 8 +- pkg/controller/service/patch.go | 63 ++++++++++++ pkg/controller/service/patch_test.go | 101 +++++++++++++++++++ pkg/controller/service/service_controller.go | 50 ++------- 4 files changed, 179 insertions(+), 43 deletions(-) create mode 100644 pkg/controller/service/patch.go create mode 100644 pkg/controller/service/patch_test.go diff --git a/pkg/controller/service/BUILD b/pkg/controller/service/BUILD index e679bad8ec2..98b9daf21ea 100644 --- a/pkg/controller/service/BUILD +++ b/pkg/controller/service/BUILD @@ -10,6 +10,7 @@ go_library( name = "go_default_library", srcs = [ "doc.go", + "patch.go", "service_controller.go", ], importpath = "k8s.io/kubernetes/pkg/controller/service", @@ -20,8 +21,10 @@ go_library( "//pkg/util/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", @@ -39,7 +42,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["service_controller_test.go"], + srcs = [ + "patch_test.go", + "service_controller_test.go", + ], embed = [":go_default_library"], deps = [ "//pkg/api/testapi:go_default_library", diff --git a/pkg/controller/service/patch.go b/pkg/controller/service/patch.go new file mode 100644 index 00000000000..1364317d182 --- /dev/null +++ b/pkg/controller/service/patch.go @@ -0,0 +1,63 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "encoding/json" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" +) + +// patch patches service's Status or ObjectMeta given the origin and +// updated ones. Change to spec will be ignored. +func patch(c v1core.CoreV1Interface, oldSvc *v1.Service, newSvc *v1.Service) (*v1.Service, error) { + // Reset spec to make sure only patch for Status or ObjectMeta. + newSvc.Spec = oldSvc.Spec + + patchBytes, err := getPatchBytes(oldSvc, newSvc) + if err != nil { + return nil, err + } + + updatedSvc, err := c.Services(oldSvc.Namespace).Patch(oldSvc.Name, types.StrategicMergePatchType, patchBytes, "status") + if err != nil { + return nil, fmt.Errorf("failed to patch %q for svc %s/%s: %v", patchBytes, oldSvc.Namespace, oldSvc.Name, err) + } + return updatedSvc, nil +} + +func getPatchBytes(oldSvc *v1.Service, newSvc *v1.Service) ([]byte, error) { + oldData, err := json.Marshal(oldSvc) + if err != nil { + return nil, fmt.Errorf("failed to Marshal oldData for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err) + } + + newData, err := json.Marshal(newSvc) + if err != nil { + return nil, fmt.Errorf("failed to Marshal newData for svc %s/%s: %v", newSvc.Namespace, newSvc.Name, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Service{}) + if err != nil { + return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err) + } + return patchBytes, nil +} diff --git a/pkg/controller/service/patch_test.go b/pkg/controller/service/patch_test.go new file mode 100644 index 00000000000..df3e3ae175f --- /dev/null +++ b/pkg/controller/service/patch_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "reflect" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func addAnnotations(svc *v1.Service) { + svc.Annotations["foo"] = "bar" +} + +func TestPatch(t *testing.T) { + svcOrigin := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-patch", + Annotations: map[string]string{}, + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + }, + } + fakeCs := fake.NewSimpleClientset(svcOrigin) + + // Issue a separate update and verify patch doesn't fail after this. + svcToUpdate := svcOrigin.DeepCopy() + addAnnotations(svcToUpdate) + if _, err := fakeCs.CoreV1().Services(svcOrigin.Namespace).Update(svcToUpdate); err != nil { + t.Fatalf("Failed to update service: %v", err) + } + + // Attempt to patch based the original service. + svcToPatch := svcOrigin.DeepCopy() + svcToPatch.Finalizers = []string{"foo"} + svcToPatch.Spec.ClusterIP = "10.0.0.2" + svcToPatch.Status = v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + {IP: "8.8.8.8"}, + }, + }, + } + svcPatched, err := patch(fakeCs.CoreV1(), svcOrigin, svcToPatch) + if err != nil { + t.Fatalf("Failed to patch service: %v", err) + } + + // Service returned by patch will contain latest content (e.g from + // the separate update). + addAnnotations(svcToPatch) + if !reflect.DeepEqual(svcPatched, svcToPatch) { + t.Errorf("PatchStatus() = %+v, want %+v", svcPatched, svcToPatch) + } + // Explicitly validate if spec is unchanged from origin. + if !reflect.DeepEqual(svcPatched.Spec, svcOrigin.Spec) { + t.Errorf("Got spec = %+v, want %+v", svcPatched.Spec, svcOrigin.Spec) + } +} + +func TestGetPatchBytes(t *testing.T) { + origin := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-patch-bytes", + Finalizers: []string{"foo"}, + }, + } + updated := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-patch-bytes", + Finalizers: []string{"foo", "bar"}, + }, + } + + b, err := getPatchBytes(origin, updated) + if err != nil { + t.Fatal(err) + } + expected := `{"metadata":{"$setElementOrder/finalizers":["foo","bar"],"finalizers":["bar"]}}` + if string(b) != expected { + t.Errorf("getPatchBytes(%+v, %+v) = %s ; want %s", origin, updated, string(b), expected) + } +} diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index 4da3f8027be..68fe82de0f6 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -24,7 +24,7 @@ import ( "reflect" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -309,57 +309,23 @@ func (s *ServiceController) syncLoadBalancerIfNeeded(key string, service *v1.Ser s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer") } - // Write the state if changed - // TODO: Be careful here ... what if there were other changes to the service? + // If there are any changes to the status then patch the service. if !v1helper.LoadBalancerStatusEqual(previousState, newState) { // Make a copy so we don't mutate the shared informer cache - service = service.DeepCopy() + updated := service.DeepCopy() + updated.Status.LoadBalancer = *newState - // Update the status on the copy - service.Status.LoadBalancer = *newState - - if err := s.persistUpdate(service); err != nil { - // TODO: This logic needs to be revisited. We might want to retry on all the errors, not just conflicts. - if errors.IsConflict(err) { - return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v", service.Namespace, service.Name, err) - } - runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err)) - return nil + if _, err := patch(s.kubeClient.CoreV1(), service, updated); err != nil { + return fmt.Errorf("failed to patch status for service %s: %v", key, err) } + klog.V(4).Infof("Successfully patched status for service %s", key) } else { - klog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key) + klog.V(4).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key) } return nil } -func (s *ServiceController) persistUpdate(service *v1.Service) error { - var err error - for i := 0; i < clientRetryCount; i++ { - _, err = s.kubeClient.CoreV1().Services(service.Namespace).UpdateStatus(service) - if err == nil { - return nil - } - // If the object no longer exists, we don't want to recreate it. Just bail - // out so that we can process the delete, which we should soon be receiving - // if we haven't already. - if errors.IsNotFound(err) { - klog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v", - service.Namespace, service.Name, err) - return nil - } - // TODO: Try to resolve the conflict if the change was unrelated to load - // balancer status. For now, just pass it up the stack. - if errors.IsConflict(err) { - return err - } - klog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v", - service.Namespace, service.Name, err) - time.Sleep(clientRetryInterval) - } - return err -} - func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) { nodes, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) if err != nil { From bff5f08e19dba1a68dbb2849bf7114ad60c47ca2 Mon Sep 17 00:00:00 2001 From: Zihong Zheng Date: Wed, 15 May 2019 21:32:28 -0700 Subject: [PATCH 2/3] Allow service controller role to patch service status Co-authored-by: Josh Horwitz --- .../auth/authorizer/rbac/bootstrappolicy/controller_policy.go | 2 +- .../rbac/bootstrappolicy/testdata/controller-roles.yaml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index d1e83dd99cd..18a3d4aabd0 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -297,7 +297,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "service-controller"}, Rules: []rbacv1.PolicyRule{ rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("services").RuleOrDie(), - rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("services/status").RuleOrDie(), + rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("services/status").RuleOrDie(), rbacv1helpers.NewRule("list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), eventsRule(), }, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index 2c64f63a963..1384c1eea68 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -1117,6 +1117,7 @@ items: resources: - services/status verbs: + - patch - update - apiGroups: - "" From cda73ebbb02d155fb04cb4fb4066355003c15fae Mon Sep 17 00:00:00 2001 From: Zihong Zheng Date: Thu, 16 May 2019 11:14:20 -0700 Subject: [PATCH 3/3] Fixes service controller unit test for patching Co-authored-by: Josh Horwitz --- .../service/service_controller_test.go | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/pkg/controller/service/service_controller_test.go b/pkg/controller/service/service_controller_test.go index 088d3389259..c8bb1eb2302 100644 --- a/pkg/controller/service/service_controller_test.go +++ b/pkg/controller/service/service_controller_test.go @@ -23,7 +23,7 @@ import ( "strings" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -40,14 +40,22 @@ import ( const region = "us-central" func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Service { - return &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default", UID: uid, SelfLink: testapi.Default.SelfLink("services", name)}, Spec: v1.ServiceSpec{Type: serviceType}} + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: uid, + SelfLink: testapi.Default.SelfLink("services", name), + }, + Spec: v1.ServiceSpec{ + Type: serviceType, + }, + } } //Wrap newService so that you don't have to call default arguments again and again. func defaultExternalService() *v1.Service { - return newService("external-balancer", types.UID("123"), v1.ServiceTypeLoadBalancer) - } func alwaysReady() bool { return true } @@ -151,7 +159,13 @@ func TestCreateExternalLoadBalancer(t *testing.T) { for _, item := range table { controller, cloud, client := newController() - err := controller.syncLoadBalancerIfNeeded("foo/bar", item.service) + key := fmt.Sprintf("%s/%s", item.service.Namespace, item.service.Name) + if _, err := client.CoreV1().Services(item.service.Namespace).Create(item.service); err != nil { + t.Errorf("Failed to prepare service %s for testing: %v", key, err) + continue + } + client.ClearActions() + err := controller.syncLoadBalancerIfNeeded(key, item.service) if !item.expectErr && err != nil { t.Errorf("unexpected error: %v", err) } else if item.expectErr && err == nil { @@ -185,12 +199,12 @@ func TestCreateExternalLoadBalancer(t *testing.T) { } actionFound := false for _, action := range actions { - if action.GetVerb() == "update" && action.GetResource().Resource == "services" { + if action.GetVerb() == "patch" && action.GetResource().Resource == "services" { actionFound = true } } if !actionFound { - t.Errorf("expected updated service to be sent to client, got these actions instead: %v", actions) + t.Errorf("expected patch service to be sent to client, got these actions instead: %v", actions) } } } @@ -326,7 +340,7 @@ func TestGetNodeConditionPredicate(t *testing.T) { } func TestProcessServiceUpdate(t *testing.T) { - var controller *ServiceController + controller, _, client := newController() //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" @@ -345,7 +359,6 @@ func TestProcessServiceUpdate(t *testing.T) { svc: defaultExternalService(), updateFn: func(svc *v1.Service) *v1.Service { - controller, _, _ = newController() controller.cache.getOrCreate("validKey") return svc @@ -404,6 +417,9 @@ func TestProcessServiceUpdate(t *testing.T) { for _, tc := range testCases { newSvc := tc.updateFn(tc.svc) + if _, err := client.CoreV1().Services(tc.svc.Namespace).Create(tc.svc); err != nil { + t.Fatalf("Failed to prepare service %s for testing: %v", tc.key, err) + } svcCache := controller.cache.getOrCreate(tc.key) obtErr := controller.processServiceUpdate(svcCache, newSvc, tc.key) if err := tc.expectedFn(newSvc, obtErr); err != nil {