Merge pull request #41258 from shashidharatd/federation-service-controller-1

Automatic merge from submit-queue (batch tested with PRs 44942, 41258)

[Federation] Use federated informer for service controller and annotations to store lb ingress

**What this PR does / why we need it**:
This is breaking up of the PR #40296 into smaller one. please refer to #41253

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #
Handles 2 tasks in #41253
Fixes issues in #27623, #35827

**Special notes for your reviewer**:

**Release note**:
```
NONE
```

cc @quinton-hoole @nikhiljindal @kubernetes/sig-federation-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-04-26 08:57:38 -07:00 committed by GitHub
commit 2d79d53fb2
14 changed files with 1285 additions and 117 deletions

View File

@ -19,6 +19,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -19,6 +19,7 @@ package federation
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
)
// ServerAddressByClientCIDR helps the client to determine the server address that they should use, depending on the clientCIDR that they match.
@ -153,3 +154,18 @@ type ClusterReplicaSetPreferences struct {
// A number expressing the preference to put an additional replica to this LocalReplicaSet. 0 by default.
Weight int64
}
// Annotation for a federated service to keep record of service loadbalancer ingresses in federated cluster
type FederatedServiceIngress struct {
// List of loadbalancer ingress of a service in all federated clusters
// +optional
Items []ClusterServiceIngress `json:"items,omitempty"`
}
// Loadbalancer ingresses of a service within a federated cluster
type ClusterServiceIngress struct {
// Cluster is the name of the federated cluster
Cluster string `json:"cluster"`
// List of loadbalancer ingresses of a federated service within a federated cluster
Items []v1.LoadBalancerIngress `json:"items"`
}

View File

@ -25,6 +25,7 @@ import (
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
api "k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
reflect "reflect"
)
@ -40,9 +41,11 @@ func RegisterDeepCopies(scheme *runtime.Scheme) error {
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterCondition, InType: reflect.TypeOf(&ClusterCondition{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterList, InType: reflect.TypeOf(&ClusterList{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterReplicaSetPreferences, InType: reflect.TypeOf(&ClusterReplicaSetPreferences{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterServiceIngress, InType: reflect.TypeOf(&ClusterServiceIngress{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterSpec, InType: reflect.TypeOf(&ClusterSpec{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterStatus, InType: reflect.TypeOf(&ClusterStatus{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_FederatedReplicaSetPreferences, InType: reflect.TypeOf(&FederatedReplicaSetPreferences{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_FederatedServiceIngress, InType: reflect.TypeOf(&FederatedServiceIngress{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ServerAddressByClientCIDR, InType: reflect.TypeOf(&ServerAddressByClientCIDR{})},
)
}
@ -110,6 +113,20 @@ func DeepCopy_federation_ClusterReplicaSetPreferences(in interface{}, out interf
}
}
func DeepCopy_federation_ClusterServiceIngress(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*ClusterServiceIngress)
out := out.(*ClusterServiceIngress)
*out = *in
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]api_v1.LoadBalancerIngress, len(*in))
copy(*out, *in)
}
return nil
}
}
func DeepCopy_federation_ClusterSpec(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*ClusterSpec)
@ -172,6 +189,24 @@ func DeepCopy_federation_FederatedReplicaSetPreferences(in interface{}, out inte
}
}
func DeepCopy_federation_FederatedServiceIngress(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*FederatedServiceIngress)
out := out.(*FederatedServiceIngress)
*out = *in
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ClusterServiceIngress, len(*in))
for i := range *in {
if err := DeepCopy_federation_ClusterServiceIngress(&(*in)[i], &(*out)[i], c); err != nil {
return err
}
}
}
return nil
}
}
func DeepCopy_federation_ServerAddressByClientCIDR(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*ServerAddressByClientCIDR)

View File

@ -15,11 +15,13 @@ go_library(
"dns.go",
"doc.go",
"endpoint_helper.go",
"ingress.go",
"service_helper.go",
"servicecontroller.go",
],
tags = ["automanaged"],
deps = [
"//federation/apis/federation:go_default_library",
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/cache:go_default_library",
"//federation/client/clientset_generated/federation_clientset:go_default_library",
@ -36,8 +38,10 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
@ -46,6 +50,7 @@ go_library(
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
@ -62,10 +67,22 @@ go_test(
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -24,8 +24,10 @@ import (
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
"k8s.io/kubernetes/pkg/api/v1"
)
const (
@ -34,7 +36,7 @@ const (
)
// getHealthyEndpoints returns the hostnames and/or IP addresses of healthy endpoints for the service, at a zone, region and global level (or an error)
func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedService *cachedService) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
var (
zoneNames []string
regionName string
@ -42,16 +44,24 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic
if zoneNames, regionName, err = s.getClusterZoneNames(clusterName); err != nil {
return nil, nil, nil, err
}
for lbClusterName, lbStatus := range cachedService.serviceStatusMap {
// If federated service is deleted, return empty endpoints, so that DNS records are removed
if service.DeletionTimestamp != nil {
return zoneEndpoints, regionEndpoints, globalEndpoints, nil
}
serviceIngress, err := ParseFederatedServiceIngress(service)
if err != nil {
return nil, nil, nil, err
}
for _, lbClusterIngress := range serviceIngress.Items {
lbClusterName := lbClusterIngress.Cluster
lbZoneNames, lbRegionName, err := s.getClusterZoneNames(lbClusterName)
if err != nil {
return nil, nil, nil, err
}
for _, ingress := range lbStatus.Ingress {
readyEndpoints, ok := cachedService.endpointMap[lbClusterName]
if !ok || readyEndpoints == 0 {
continue
}
for _, ingress := range lbClusterIngress.Items {
var address string
// We should get either an IP address or a hostname - use whichever one we get
if ingress.IP != "" {
@ -61,7 +71,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic
}
if len(address) <= 0 {
return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service.",
cachedService.lastState.Name, cachedService.lastState.Namespace, clusterName)
service.Name, service.Namespace, clusterName)
}
for _, lbZoneName := range lbZoneNames {
for _, zoneName := range zoneNames {
@ -80,15 +90,12 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, cachedServic
}
// getClusterZoneNames returns the name of the zones (and the region) where the specified cluster exists (e.g. zones "us-east1-c" on GCE, or "us-east-1b" on AWS)
func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []string, region string, err error) {
client, ok := s.clusterCache.clientMap[clusterName]
if !ok {
return nil, "", fmt.Errorf("Cluster cache does not contain entry for cluster %s", clusterName)
func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, string, error) {
cluster, err := s.federationClient.Federation().Clusters().Get(clusterName, metav1.GetOptions{})
if err != nil {
return nil, "", err
}
if client.cluster == nil {
return nil, "", fmt.Errorf("Cluster cache entry for cluster %s is nil", clusterName)
}
return client.cluster.Status.Zones, client.cluster.Status.Region, nil
return cluster.Status.Zones, cluster.Status.Region, nil
}
// getServiceDnsSuffix returns the DNS suffix to use when creating federated-service DNS records
@ -284,7 +291,7 @@ given the current state of that service in that cluster. This should be called
(or vice versa). Only shards of the service which have both a loadbalancer ingress IP address or hostname AND at least one healthy backend endpoint
are included in DNS records for that service (at all of zone, region and global levels). All other addresses are removed. Also, if no shards exist
in the zone or region of the cluster, a CNAME reference to the next higher level is ensured to exist. */
func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *cachedService) error {
func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Service) error {
// Quinton: Pseudocode....
// See https://github.com/kubernetes/kubernetes/pull/25107#issuecomment-218026648
// For each service we need the following DNS names:
@ -298,21 +305,21 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
// - a set of A records to IP addresses of all healthy shards in all regions, if one or more of these exist.
// - no record (NXRECORD response) if no healthy shards exist in any regions
//
// For each cached service, cachedService.lastState tracks the current known state of the service, while cachedService.appliedState contains
// the state of the service when we last successfully synced its DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records).
// Each service has the current known state of loadbalancer ingress for the federated cluster stored in annotations.
// So generate the DNS records based on the current state and ensure those desired DNS records match the
// actual DNS records (add new records, remove deleted records, and update changed records).
//
if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
}
if s.dns == nil {
return nil
}
if cachedService == nil {
return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
if service == nil {
return fmt.Errorf("nil service passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
}
serviceName := cachedService.lastState.Name
namespaceName := cachedService.lastState.Namespace
serviceName := service.Name
namespaceName := service.Namespace
zoneNames, regionName, err := s.getClusterZoneNames(clusterName)
if err != nil {
return err
@ -324,7 +331,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
if err != nil {
return err
}
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, cachedService)
zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service)
if err != nil {
return err
}

View File

@ -17,21 +17,33 @@ limitations under the License.
package service
import (
"sync"
"testing"
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"k8s.io/kubernetes/pkg/api/v1"
)
func TestServiceController_ensureDnsRecords(t *testing.T) {
cluster1Name := "c1"
cluster2Name := "c2"
cluster1 := NewClusterWithRegionZone(cluster1Name, v1.ConditionTrue, "fooregion", "foozone")
cluster2 := NewClusterWithRegionZone(cluster2Name, v1.ConditionTrue, "barregion", "barzone")
globalDNSName := "servicename.servicenamespace.myfederation.svc.federation.example.com"
fooRegionDNSName := "servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com"
fooZoneDNSName := "servicename.servicenamespace.myfederation.svc.foozone.fooregion.federation.example.com"
barRegionDNSName := "servicename.servicenamespace.myfederation.svc.barregion.federation.example.com"
barZoneDNSName := "servicename.servicenamespace.myfederation.svc.barzone.barregion.federation.example.com"
tests := []struct {
name string
service v1.Service
@ -39,18 +51,24 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
serviceStatus v1.LoadBalancerStatus
}{
{
name: "withip",
name: "ServiceWithSingleLBIngress",
service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "servicename",
Namespace: "servicenamespace",
Annotations: map[string]string{
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
String()},
},
},
serviceStatus: buildServiceStatus([][]string{{"198.51.100.1", ""}}),
expected: []string{
"example.com:servicename.servicenamespace.myfederation.svc.federation.example.com:A:180:[198.51.100.1]",
"example.com:servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com:A:180:[198.51.100.1]",
"example.com:servicename.servicenamespace.myfederation.svc.foozone.fooregion.federation.example.com:A:180:[198.51.100.1]",
"example.com:" + globalDNSName + ":A:180:[198.51.100.1]",
"example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]",
"example.com:" + fooZoneDNSName + ":A:180:[198.51.100.1]",
"example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]",
"example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]",
},
},
/*
@ -66,14 +84,14 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
},
serviceStatus: buildServiceStatus([][]string{{"", "randomstring.amazonelb.example.com"}}),
expected: []string{
"example.com:servicename.servicenamespace.myfederation.svc.federation.example.com:A:180:[198.51.100.1]",
"example.com:servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com:A:180:[198.51.100.1]",
"example.com:servicename.servicenamespace.myfederation.svc.foozone.fooregion.federation.example.com:A:180:[198.51.100.1]",
"example.com:"+globalDNSName+":A:180:[198.51.100.1]",
"example.com:"+fooRegionDNSName+":A:180:[198.51.100.1]",
"example.com:"+fooZoneDNSName+":A:180:[198.51.100.1]",
},
},
*/
{
name: "noendpoints",
name: "ServiceWithNoLBIngress",
service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "servicename",
@ -81,8 +99,96 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
},
},
expected: []string{
"example.com:servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com:CNAME:180:[servicename.servicenamespace.myfederation.svc.federation.example.com]",
"example.com:servicename.servicenamespace.myfederation.svc.foozone.fooregion.federation.example.com:CNAME:180:[servicename.servicenamespace.myfederation.svc.fooregion.federation.example.com]",
"example.com:" + fooRegionDNSName + ":CNAME:180:[" + globalDNSName + "]",
"example.com:" + fooZoneDNSName + ":CNAME:180:[" + fooRegionDNSName + "]",
"example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]",
"example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]",
},
},
{
name: "ServiceWithMultipleLBIngress",
service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "servicename",
Namespace: "servicenamespace",
Annotations: map[string]string{
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
String()},
},
},
expected: []string{
"example.com:" + globalDNSName + ":A:180:[198.51.100.1 198.51.200.1]",
"example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]",
"example.com:" + fooZoneDNSName + ":A:180:[198.51.100.1]",
"example.com:" + barRegionDNSName + ":A:180:[198.51.200.1]",
"example.com:" + barZoneDNSName + ":A:180:[198.51.200.1]",
},
},
{
name: "ServiceWithLBIngressAndServiceDeleted",
service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "servicename",
Namespace: "servicenamespace",
DeletionTimestamp: &metav1.Time{Time: time.Now()},
Annotations: map[string]string{
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
String()},
},
},
expected: []string{
// TODO: Ideally we should expect that there are no DNS records when federated service is deleted. Need to remove these leaks in future
"example.com:" + fooRegionDNSName + ":CNAME:180:[" + globalDNSName + "]",
"example.com:" + fooZoneDNSName + ":CNAME:180:[" + fooRegionDNSName + "]",
"example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]",
"example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]",
},
},
{
name: "ServiceWithMultipleLBIngressAndOneLBIngressGettingRemoved",
service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "servicename",
Namespace: "servicenamespace",
Annotations: map[string]string{
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
RemoveEndpoint(cluster2Name, "198.51.200.1").
String()},
},
},
expected: []string{
"example.com:" + globalDNSName + ":A:180:[198.51.100.1]",
"example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]",
"example.com:" + fooZoneDNSName + ":A:180:[198.51.100.1]",
"example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]",
"example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]",
},
},
{
name: "ServiceWithMultipleLBIngressAndAllLBIngressGettingRemoved",
service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "servicename",
Namespace: "servicenamespace",
Annotations: map[string]string{
FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
RemoveEndpoint(cluster1Name, "198.51.100.1").
RemoveEndpoint(cluster2Name, "198.51.200.1").
String()},
},
},
expected: []string{
"example.com:" + fooRegionDNSName + ":CNAME:180:[" + globalDNSName + "]",
"example.com:" + fooZoneDNSName + ":CNAME:180:[" + fooRegionDNSName + "]",
"example.com:" + barRegionDNSName + ":CNAME:180:[" + globalDNSName + "]",
"example.com:" + barZoneDNSName + ":CNAME:180:[" + barRegionDNSName + "]",
},
},
}
@ -92,7 +198,10 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
if !ok {
t.Error("Unable to fetch zones")
}
fakeClient := &fakefedclientset.Clientset{}
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}})
serviceController := ServiceController{
federationClient: fakeClient,
dns: fakedns,
dnsZones: fakednsZones,
serviceDnsSuffix: "federation.example.com",
@ -106,9 +215,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
knownClusterSet: make(sets.String),
}
clusterName := "testcluster"
serviceController.clusterCache.clientMap[clusterName] = &clusterCache{
serviceController.clusterCache.clientMap[cluster1Name] = &clusterCache{
cluster: &v1beta1.Cluster{
Status: v1beta1.ClusterStatus{
Zones: []string{"foozone"},
@ -122,12 +229,16 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
endpointMap: make(map[string]int),
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
}
cachedService.endpointMap[clusterName] = 1
cachedService.endpointMap[cluster1Name] = 1
if !reflect.DeepEqual(&test.serviceStatus, &v1.LoadBalancerStatus{}) {
cachedService.serviceStatusMap[clusterName] = test.serviceStatus
cachedService.serviceStatusMap[cluster1Name] = test.serviceStatus
}
err := serviceController.ensureDnsRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(cluster1Name, &test.service)
if err != nil {
t.Errorf("Test failed for %s, unexpected error %v", test.name, err)
}
err = serviceController.ensureDnsRecords(cluster2Name, &test.service)
if err != nil {
t.Errorf("Test failed for %s, unexpected error %v", test.name, err)
}
@ -138,7 +249,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
}
// Dump every record to a testable-by-string-comparison form
var records []string
records := []string{}
for _, z := range zones {
zoneName := z.Name()

View File

@ -122,7 +122,7 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}
@ -154,7 +154,7 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService
glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}
@ -175,7 +175,7 @@ func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService
glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
err := serviceController.ensureDnsRecords(clusterName, cachedService)
err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState)
if err == nil {
return nil
}

View File

@ -21,14 +21,18 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
v1 "k8s.io/kubernetes/pkg/api/v1"
)
var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required.
var fakeDnsZones, _ = fakeDns.Zones()
var fakeClient = &fakefedclientset.Clientset{}
var fakeServiceController = ServiceController{
federationClient: fakeClient,
dns: fakeDns,
dnsZones: fakeDnsZones,
federationName: "fed1",
@ -68,6 +72,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
},
},
}
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
tests := []struct {
name string
cachedService *cachedService
@ -121,6 +126,7 @@ func TestProcessEndpointDeletion(t *testing.T) {
},
},
}
RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}})
tests := []struct {
name string
cachedService *cachedService

View File

@ -0,0 +1,136 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"encoding/json"
"sort"
"strings"
fedapi "k8s.io/kubernetes/federation/apis/federation"
"k8s.io/kubernetes/pkg/api/v1"
)
// Compile time check for interface adherence
var _ sort.Interface = &FederatedServiceIngress{}
const (
FederatedServiceIngressAnnotation = "federation.kubernetes.io/service-ingresses"
)
// FederatedServiceIngress implements sort.Interface.
type FederatedServiceIngress struct {
fedapi.FederatedServiceIngress
}
func NewFederatedServiceIngress() *FederatedServiceIngress {
return &FederatedServiceIngress{}
}
func (ingress *FederatedServiceIngress) String() string {
annotationBytes, _ := json.Marshal(ingress)
return string(annotationBytes[:])
}
// Len is to satisfy of sort.Interface.
func (ingress *FederatedServiceIngress) Len() int {
return len(ingress.Items)
}
// Less is to satisfy of sort.Interface.
func (ingress *FederatedServiceIngress) Less(i, j int) bool {
return (strings.Compare(ingress.Items[i].Cluster, ingress.Items[j].Cluster) < 0)
}
// Swap is to satisfy of sort.Interface.
func (ingress *FederatedServiceIngress) Swap(i, j int) {
ingress.Items[i].Cluster, ingress.Items[j].Cluster = ingress.Items[j].Cluster, ingress.Items[i].Cluster
ingress.Items[i].Items, ingress.Items[j].Items = ingress.Items[j].Items, ingress.Items[i].Items
}
// GetClusterLoadBalancerIngresses returns loadbalancer ingresses for given cluster if exist otherwise returns an empty slice
func (ingress *FederatedServiceIngress) GetClusterLoadBalancerIngresses(cluster string) []v1.LoadBalancerIngress {
for _, clusterIngress := range ingress.Items {
if cluster == clusterIngress.Cluster {
return clusterIngress.Items
}
}
return []v1.LoadBalancerIngress{}
}
// AddClusterLoadBalancerIngresses adds the ladbalancer ingresses for a given cluster to federated service ingress
func (ingress *FederatedServiceIngress) AddClusterLoadBalancerIngresses(cluster string, loadbalancerIngresses []v1.LoadBalancerIngress) {
for i, clusterIngress := range ingress.Items {
if cluster == clusterIngress.Cluster {
ingress.Items[i].Items = append(ingress.Items[i].Items, loadbalancerIngresses...)
return
}
}
clusterNewIngress := fedapi.ClusterServiceIngress{Cluster: cluster, Items: loadbalancerIngresses}
ingress.Items = append(ingress.Items, clusterNewIngress)
sort.Sort(ingress)
}
// AddEndpoints add one or more endpoints to federated service ingress.
// endpoints are federated cluster's loadbalancer ip/hostname for the service
func (ingress *FederatedServiceIngress) AddEndpoints(cluster string, endpoints []string) *FederatedServiceIngress {
lbIngress := []v1.LoadBalancerIngress{}
for _, endpoint := range endpoints {
lbIngress = append(lbIngress, v1.LoadBalancerIngress{IP: endpoint})
}
ingress.AddClusterLoadBalancerIngresses(cluster, lbIngress)
return ingress
}
// RemoveEndpoint removes a single endpoint (ip/hostname) from the federated service ingress
func (ingress *FederatedServiceIngress) RemoveEndpoint(cluster string, endpoint string) *FederatedServiceIngress {
for i, clusterIngress := range ingress.Items {
if cluster == clusterIngress.Cluster {
for j, lbIngress := range clusterIngress.Items {
if lbIngress.IP == endpoint {
ingress.Items[i].Items = append(ingress.Items[i].Items[:j], ingress.Items[i].Items[j+1:]...)
}
}
}
}
return ingress
}
// ParseFederatedServiceIngress extracts federated service ingresses from a federated service
func ParseFederatedServiceIngress(service *v1.Service) (*FederatedServiceIngress, error) {
ingress := FederatedServiceIngress{}
if service.Annotations == nil {
return &ingress, nil
}
federatedServiceIngressString, found := service.Annotations[FederatedServiceIngressAnnotation]
if !found {
return &ingress, nil
}
if err := json.Unmarshal([]byte(federatedServiceIngressString), &ingress); err != nil {
return &ingress, err
}
return &ingress, nil
}
// UpdateIngressAnnotation updates the federated service with service ingress annotation
func UpdateIngressAnnotation(service *v1.Service, ingress *FederatedServiceIngress) *v1.Service {
if service.Annotations == nil {
service.Annotations = make(map[string]string)
}
service.Annotations[FederatedServiceIngressAnnotation] = ingress.String()
return service
}

View File

@ -108,7 +108,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
if needUpdate {
for i := 0; i < clientRetryCount; i++ {
err := sc.ensureDnsRecords(clusterName, cachedService)
err := sc.ensureDnsRecords(clusterName, service)
if err == nil {
break
}

View File

@ -18,6 +18,8 @@ package service
import (
"fmt"
"sort"
"strings"
"sync"
"time"
@ -27,8 +29,10 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/labels"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -36,7 +40,9 @@ import (
clientv1 "k8s.io/client-go/pkg/api/v1"
cache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
fedapi "k8s.io/kubernetes/federation/apis/federation"
v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationcache "k8s.io/kubernetes/federation/client/cache"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
@ -74,6 +80,7 @@ const (
maxNoOfClusters = 100
reviewDelay = 10 * time.Second
updateTimeout = 30 * time.Second
allClustersKey = "ALL_CLUSTERS"
clusterAvailableDelay = time.Second * 20
@ -154,6 +161,15 @@ type ServiceController struct {
clusterDeliverer *util.DelayingDeliverer
deletionHelper *deletionhelper.DeletionHelper
reviewDelay time.Duration
clusterAvailableDelay time.Duration
updateTimeout time.Duration
endpointFederatedInformer fedutil.FederatedInformer
federatedUpdater fedutil.FederatedUpdater
objectDeliverer *util.DelayingDeliverer
flowcontrolBackoff *flowcontrol.Backoff
}
// New returns a new service controller to keep DNS provider service resources
@ -178,11 +194,16 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
rwlock: sync.Mutex{},
clientMap: make(map[string]*clusterCache),
},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
queue: workqueue.New(),
knownClusterSet: make(sets.String),
eventBroadcaster: broadcaster,
eventRecorder: recorder,
queue: workqueue.New(),
knownClusterSet: make(sets.String),
reviewDelay: reviewDelay,
clusterAvailableDelay: clusterAvailableDelay,
updateTimeout: updateTimeout,
flowcontrolBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
}
s.objectDeliverer = util.NewDelayingDeliverer()
s.clusterDeliverer = util.NewDelayingDeliverer()
var serviceIndexer cache.Indexer
serviceIndexer, s.serviceController = cache.NewIndexerInformer(
@ -196,57 +217,13 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
},
&v1.Service{},
serviceSyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
// there is case that old and new are equals but we still catch the event now.
if !reflect.DeepEqual(old, cur) {
s.enqueueService(cur)
}
},
DeleteFunc: s.enqueueService,
},
util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) {
glog.V(5).Infof("Delivering notification from federation: %v", obj)
s.deliverObject(obj, 0, false)
}),
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
s.serviceStore = corelisters.NewServiceLister(serviceIndexer)
s.clusterStore.Store, s.clusterController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return s.federationClient.Federation().Clusters().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return s.federationClient.Federation().Clusters().Watch(options)
},
},
&v1beta1.Cluster{},
clusterSyncPeriod,
cache.ResourceEventHandlerFuncs{
DeleteFunc: s.clusterCache.delFromClusterSet,
AddFunc: s.clusterCache.addToClientMap,
UpdateFunc: func(old, cur interface{}) {
oldCluster, ok := old.(*v1beta1.Cluster)
if !ok {
return
}
curCluster, ok := cur.(*v1beta1.Cluster)
if !ok {
return
}
if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
// update when spec is changed
s.clusterCache.addToClientMap(cur)
}
pred := getClusterConditionPredicate()
// only update when condition changed to ready from not-ready
if !pred(*oldCluster) && pred(*curCluster) {
s.clusterCache.addToClientMap(cur)
}
// did not handle ready -> not-ready
// how could we stop a controller?
},
},
)
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *v1beta1.Cluster) {
@ -269,14 +246,15 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
// would be just confirmation that some service operation succeeded.
util.NewTriggerOnAllChanges(
func(obj pkgruntime.Object) {
// TODO: Use this to enque services.
glog.V(5).Infof("Delivering service notification from federated cluster %s: %v", cluster.Name, obj)
s.deliverObject(obj, s.reviewDelay, false)
},
))
}
s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle)
federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer,
s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer,
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
svc := obj.(*v1.Service)
_, err := client.Core().Services(svc.Namespace).Create(svc)
@ -291,9 +269,41 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
svc := obj.(*v1.Service)
orphanDependents := false
err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
// IsNotFound error is fine since that means the object is deleted already.
if errors.IsNotFound(err) {
return nil
}
return err
})
// Federated informers on endpoints in federated clusters.
// This will enable to check if service ingress endpoints in federated clusters are reachable
s.endpointFederatedInformer = fedutil.NewFederatedInformer(
federationClient,
func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (
cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return targetClient.Core().Endpoints(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return targetClient.Core().Endpoints(metav1.NamespaceAll).Watch(options)
},
},
&v1.Endpoints{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnMetaAndFieldChanges(
"Subsets",
func(obj pkgruntime.Object) {
glog.V(5).Infof("Delivering endpoint notification from federated cluster %s :%v", cluster.Name, obj)
s.deliverObject(obj, s.reviewDelay, false)
},
))
},
&fedutil.ClusterLifecycleHandlerFuncs{},
)
s.deletionHelper = deletionhelper.NewDeletionHelper(
s.updateService,
// objNameFunc
@ -304,7 +314,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
updateTimeout,
s.eventRecorder,
s.federatedInformer,
federatedUpdater,
s.federatedUpdater,
)
s.endpointWorkerMap = make(map[string]bool)
@ -348,22 +358,27 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
}
defer runtime.HandleCrash()
s.federatedInformer.Start()
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
// TODO: Use this new clusterDeliverer to reconcile services in new clusters.
s.endpointFederatedInformer.Start()
s.objectDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
s.queue.Add(item.Value.(string))
})
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
s.deliverServicesOnClusterChange()
})
fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh)
go s.serviceController.Run(stopCh)
go s.clusterController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(s.fedServiceWorker, time.Second, stopCh)
}
go wait.Until(s.clusterEndpointWorker, time.Second, stopCh)
go wait.Until(s.clusterServiceWorker, time.Second, stopCh)
go wait.Until(s.clusterSyncLoop, time.Second, stopCh)
go func() {
<-stopCh
glog.Infof("Shutting down Federation Service Controller")
s.queue.ShutDown()
s.federatedInformer.Stop()
s.endpointFederatedInformer.Stop()
s.objectDeliverer.Stop()
s.clusterDeliverer.Stop()
}()
return nil
}
@ -416,8 +431,16 @@ func (s *ServiceController) init() error {
return nil
}
type reconciliationStatus string
const (
statusAllOk = reconciliationStatus("ALL_OK")
statusRecoverableError = reconciliationStatus("RECOVERABLE_ERROR")
statusNonRecoverableError = reconciliationStatus("NON_RECOVERABLE_ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncService is never invoked concurrently with the same key.
func (s *ServiceController) fedServiceWorker() {
for {
func() {
@ -425,11 +448,21 @@ func (s *ServiceController) fedServiceWorker() {
if quit {
return
}
defer s.queue.Done(key)
err := s.syncService(key.(string))
if err != nil {
glog.Errorf("Error syncing service: %v", err)
service := key.(string)
status := s.reconcileService(service)
switch status {
case statusAllOk:
break
case statusNotSynced:
glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service)
s.deliverService(service, s.clusterAvailableDelay, false)
case statusRecoverableError:
s.deliverService(service, 0, true)
case statusNonRecoverableError:
// error is already logged, do nothing
default:
// unreachable
}
}()
}
@ -826,7 +859,7 @@ func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clust
for key := range s.clusterCache.clientMap {
for _, clusterName := range clusterNames {
if key == clusterName {
err := s.ensureDnsRecords(clusterName, service)
err := s.ensureDnsRecords(clusterName, service.lastState)
if err != nil {
unensuredCount += 1
glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err)
@ -1020,6 +1053,24 @@ func (s *ServiceController) delete(service *v1.Service) error {
return err
}
// Ensure DNS records are removed for service
if wantsDNSRecords(service) {
key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
serviceIngress, err := ParseFederatedServiceIngress(service)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err))
return err
}
for _, ingress := range serviceIngress.Items {
err := s.ensureDnsRecords(ingress.Cluster, service)
if err != nil {
glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, ingress.Cluster, err)
return err
}
glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster)
}
}
err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
@ -1040,3 +1091,415 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura
s.serviceCache.delete(key)
return nil, doNotRetry
}
func (s *ServiceController) deliverServicesOnClusterChange() {
if !s.isSynced() {
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, s.clusterAvailableDelay)
}
glog.V(5).Infof("Delivering all service as cluster status changed")
serviceList, err := s.serviceStore.List(labels.Everything())
if err != nil {
runtime.HandleError(fmt.Errorf("error listing federated services: %v", err))
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, 0)
}
for _, service := range serviceList {
s.deliverObject(service, 0, false)
}
}
func (s *ServiceController) deliverObject(object interface{}, delay time.Duration, failed bool) {
switch value := object.(type) {
case *v1.Service:
s.deliverService(types.NamespacedName{Namespace: value.Namespace, Name: value.Name}.String(), delay, failed)
case *v1.Endpoints:
s.deliverService(types.NamespacedName{Namespace: value.Namespace, Name: value.Name}.String(), delay, failed)
default:
glog.Warningf("Unknown object received: %v", object)
}
}
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (s *ServiceController) deliverService(key string, delay time.Duration, failed bool) {
if failed {
s.flowcontrolBackoff.Next(key, time.Now())
delay = delay + s.flowcontrolBackoff.Get(key)
} else {
s.flowcontrolBackoff.Reset(key)
}
s.objectDeliverer.DeliverAfter(key, key, delay)
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet synced with
// the corresponding api server.
func (s *ServiceController) isSynced() bool {
if !s.federatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
serviceClusters, err := s.federatedInformer.GetReadyClusters()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err))
return false
}
if !s.federatedInformer.GetTargetStore().ClustersSynced(serviceClusters) {
return false
}
if !s.endpointFederatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
endpointClusters, err := s.endpointFederatedInformer.GetReadyClusters()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err))
return false
}
if !s.endpointFederatedInformer.GetTargetStore().ClustersSynced(endpointClusters) {
return false
}
return true
}
// reconcileService triggers reconciliation of a federated service with corresponding services in federated clusters.
// This function is called on service Addition/Deletion/Updation either in federated cluster or in federation.
func (s *ServiceController) reconcileService(key string) reconciliationStatus {
if !s.isSynced() {
glog.V(4).Infof("Data store not synced, delaying reconcilation: %v", key)
return statusNotSynced
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("Invalid key %q recieved, unable to split key to namespace and name, err: %v", key, err))
return statusNonRecoverableError
}
service, err := s.serviceStore.Services(namespace).Get(name)
if errors.IsNotFound(err) {
// Not a federated service, ignoring.
return statusAllOk
} else if err != nil {
runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err))
return statusRecoverableError
}
glog.V(3).Infof("Reconciling federated service: %s", key)
// Create a copy before modifying the service to prevent race condition with other readers of service from store
fedServiceObj, err := api.Scheme.DeepCopy(service)
if err != nil {
runtime.HandleError(fmt.Errorf("Error in copying obj: %s, %v", key, err))
return statusNonRecoverableError
}
fedService, ok := fedServiceObj.(*v1.Service)
if err != nil || !ok {
runtime.HandleError(fmt.Errorf("Unknown obj recieved from store: %#v, %v", fedServiceObj, err))
return statusNonRecoverableError
}
// Handle deletion of federated service
if fedService.DeletionTimestamp != nil {
if err := s.delete(fedService); err != nil {
runtime.HandleError(fmt.Errorf("Failed to delete %s: %v", key, err))
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteFailed", "Deleting service failed: %v", err)
return statusRecoverableError
}
glog.V(3).Infof("Deleting federated service succeeded: %s", key)
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteSucceed", "Deleting service succeeded")
return statusAllOk
}
// Add the required finalizers before creating a service in underlying clusters. This ensures that the
// dependent services in underlying clusters are deleted when the federated service is deleted.
updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(fedService)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to ensure setting finalizer for service %s: %v", key, err))
return statusRecoverableError
}
fedService = updatedServiceObj.(*v1.Service)
// Synchronize the federated service in all underlying ready clusters.
clusters, err := s.federatedInformer.GetReadyClusters()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get ready cluster list: %v", err))
return statusRecoverableError
}
newLBStatus := newLoadbalancerStatus()
newServiceIngress := NewFederatedServiceIngress()
operations := make([]fedutil.FederatedOperation, 0)
for _, cluster := range clusters {
// Aggregate all operations to perform on all federated clusters
operation, err := s.getOperationsToPerformOnCluster(cluster, fedService)
if err != nil {
return statusRecoverableError
}
if operation != nil {
operations = append(operations, *operation)
}
// Aggregate LoadBalancerStatus from all services in federated clusters to update status in federated service
lbStatus, err := s.getServiceStatusInCluster(cluster, key)
if err != nil {
return statusRecoverableError
}
if len(lbStatus.Ingress) > 0 {
newLBStatus.Ingress = append(newLBStatus.Ingress, lbStatus.Ingress...)
// Add/Update federated service ingress only if there are reachable endpoints backing the lb service
endpoints, err := s.getServiceEndpointsInCluster(cluster, key)
if err != nil {
return statusRecoverableError
}
// if there are no endpoints created for the service then the loadbalancer ingress
// is not reachable, so do not consider such loadbalancer ingresses for federated
// service ingresses
if len(endpoints) > 0 {
clusterIngress := fedapi.ClusterServiceIngress{
Cluster: cluster.Name,
Items: lbStatus.Ingress,
}
newServiceIngress.Items = append(newServiceIngress.Items, clusterIngress)
}
}
}
if len(operations) != 0 {
err = s.federatedUpdater.UpdateWithOnError(operations, s.updateTimeout,
func(op fedutil.FederatedOperation, operror error) {
runtime.HandleError(fmt.Errorf("Service update in cluster %s failed: %v", op.ClusterName, operror))
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInClusterFailed", "Service update in cluster %s failed: %v", op.ClusterName, operror)
})
if err != nil {
if !errors.IsAlreadyExists(err) {
runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err))
return statusRecoverableError
}
}
}
// Update the federated service if there are any updates in clustered service (status/endpoints)
err = s.updateFederatedService(fedService, newLBStatus, newServiceIngress)
if err != nil {
return statusRecoverableError
}
glog.V(5).Infof("Everything is in order in federated clusters for service %s", key)
return statusAllOk
}
// getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service
func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Cluster, fedService *v1.Service) (*fedutil.FederatedOperation, error) {
var operation *fedutil.FederatedOperation
key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String()
clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err))
return nil, err
}
if !serviceFound {
desiredService := &v1.Service{
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta),
Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)),
}
desiredService.ResourceVersion = ""
glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService)
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "CreateInCluster", "Creating service in cluster %s", cluster.Name)
operation = &fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: desiredService,
ClusterName: cluster.Name,
}
} else {
clusterService, ok := clusterServiceObj.(*v1.Service)
if !ok {
runtime.HandleError(fmt.Errorf("Unexpected error for %q: %v", key, err))
return nil, err
}
desiredService := &v1.Service{
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(clusterService.ObjectMeta),
Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)),
}
// ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating
desiredService.Spec.ClusterIP = clusterService.Spec.ClusterIP
for _, cPort := range clusterService.Spec.Ports {
for i, fPort := range clusterService.Spec.Ports {
if fPort.Name == cPort.Name && fPort.Protocol == cPort.Protocol && fPort.Port == cPort.Port {
desiredService.Spec.Ports[i].NodePort = cPort.NodePort
}
}
}
// Update existing service, if needed.
if !Equivalent(desiredService, clusterService) {
glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService)
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInCluster", "Updating service in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredService, clusterService)
// ResourceVersion of cluster service can be different from federated service,
// so do not update ResourceVersion while updating cluster service
desiredService.ResourceVersion = clusterService.ResourceVersion
operation = &fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: desiredService,
ClusterName: cluster.Name,
}
} else {
glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService)
}
}
return operation, nil
}
// getServiceStatusInCluster returns service status in federated cluster
func (s *ServiceController) getServiceStatusInCluster(cluster *v1beta1.Cluster, key string) (*v1.LoadBalancerStatus, error) {
lbStatus := &v1.LoadBalancerStatus{}
clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err))
return lbStatus, err
}
if serviceFound {
clusterService, ok := clusterServiceObj.(*v1.Service)
if !ok {
err = fmt.Errorf("Unknown object received: %v", clusterServiceObj)
runtime.HandleError(err)
return lbStatus, err
}
lbStatus = &clusterService.Status.LoadBalancer
newLbStatus := &loadbalancerStatus{*lbStatus}
sort.Sort(newLbStatus)
}
return lbStatus, nil
}
// getServiceEndpointsInCluster returns ready endpoints corresonding to service in federated cluster
func (s *ServiceController) getServiceEndpointsInCluster(cluster *v1beta1.Cluster, key string) ([]v1.EndpointAddress, error) {
addresses := []v1.EndpointAddress{}
clusterEndpointsObj, endpointsFound, err := s.endpointFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get %s endpoint from %s: %v", key, cluster.Name, err))
return addresses, err
}
if endpointsFound {
clusterEndpoints, ok := clusterEndpointsObj.(*v1.Endpoints)
if !ok {
glog.Warningf("Unknown object received: %v", clusterEndpointsObj)
return addresses, fmt.Errorf("Unknown object received: %v", clusterEndpointsObj)
}
for _, subset := range clusterEndpoints.Subsets {
if len(subset.Addresses) > 0 {
addresses = append(addresses, subset.Addresses...)
}
}
}
return addresses, nil
}
// updateFederatedService updates the federated service with aggregated lbStatus and serviceIngresses
// and also updates the dns records as needed
func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLBStatus *loadbalancerStatus, newServiceIngress *FederatedServiceIngress) error {
key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String()
needUpdate := false
// Sort the endpoints so that we can compare
sort.Sort(newLBStatus)
if !reflect.DeepEqual(fedService.Status.LoadBalancer.Ingress, newLBStatus.Ingress) {
fedService.Status.LoadBalancer.Ingress = newLBStatus.Ingress
glog.V(3).Infof("Federated service loadbalancer status updated for %s: %v", key, newLBStatus.Ingress)
needUpdate = true
}
existingServiceIngress, err := ParseFederatedServiceIngress(fedService)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err))
return err
}
// TODO: We should have a reliable cluster health check(should consider quorum) to detect cluster is not
// reachable and remove dns records for them. Until a reliable cluster health check is available, below code is
// a workaround to not remove the existing dns records which were created before the cluster went offline.
unreadyClusters, err := s.federatedInformer.GetUnreadyClusters()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get unready cluster list: %v", err))
return err
}
for _, cluster := range unreadyClusters {
lbIngress := existingServiceIngress.GetClusterLoadBalancerIngresses(cluster.Name)
newServiceIngress.AddClusterLoadBalancerIngresses(cluster.Name, lbIngress)
glog.V(5).Infof("Cluster %s is Offline, Preserving previously available status for Service %s", cluster.Name, key)
}
// Update federated service status and/or ingress annotations if changed
sort.Sort(newServiceIngress)
if !reflect.DeepEqual(existingServiceIngress.Items, newServiceIngress.Items) {
fedService = UpdateIngressAnnotation(fedService, newServiceIngress)
glog.V(3).Infof("Federated service loadbalancer ingress updated for %s: existing: %#v, desired: %#v", key, existingServiceIngress, newServiceIngress)
needUpdate = true
}
if needUpdate {
var err error
fedService, err = s.federationClient.Core().Services(fedService.Namespace).UpdateStatus(fedService)
if err != nil {
runtime.HandleError(fmt.Errorf("Error updating the federation service object %s: %v", key, err))
return err
}
}
// Ensure DNS records based on Annotations in federated service for all federated clusters
if needUpdate && wantsDNSRecords(fedService) {
for _, ingress := range newServiceIngress.Items {
err := s.ensureDnsRecords(ingress.Cluster, fedService)
if err != nil {
runtime.HandleError(fmt.Errorf("Error ensuring DNS Records for service %s on cluster %q: %v", key, ingress.Cluster, err))
return err
}
glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster)
}
}
return nil
}
// Equivalent Checks if cluster-independent, user provided data in two given services are equal. If in the future the
// services structure is expanded then any field that is not populated by the api server should be included here.
func Equivalent(s1, s2 *v1.Service) bool {
// TODO: should also check for all annotations except FederationServiceIngressAnnotation
return s1.Name == s2.Name && s1.Namespace == s2.Namespace &&
(reflect.DeepEqual(s1.Labels, s2.Labels) || (len(s1.Labels) == 0 && len(s2.Labels) == 0)) &&
reflect.DeepEqual(s1.Spec, s2.Spec)
}
type loadbalancerStatus struct {
v1.LoadBalancerStatus
}
func newLoadbalancerStatus() *loadbalancerStatus {
return &loadbalancerStatus{}
}
func (lbs loadbalancerStatus) Len() int {
return len(lbs.Ingress)
}
func (lbs loadbalancerStatus) Less(i, j int) bool {
ipComparison := strings.Compare(lbs.Ingress[i].IP, lbs.Ingress[j].IP)
hostnameComparison := strings.Compare(lbs.Ingress[i].Hostname, lbs.Ingress[j].Hostname)
if ipComparison < 0 || (ipComparison == 0 && hostnameComparison < 0) {
return true
}
return false
}
func (lbs loadbalancerStatus) Swap(i, j int) {
lbs.Ingress[i].IP, lbs.Ingress[j].IP = lbs.Ingress[j].IP, lbs.Ingress[i].IP
lbs.Ingress[i].Hostname, lbs.Ingress[j].Hostname = lbs.Ingress[j].Hostname, lbs.Ingress[i].Hostname
}

View File

@ -17,13 +17,30 @@ limitations under the License.
package service
import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/golang/glog"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
)
func TestGetClusterConditionPredicate(t *testing.T) {
@ -83,3 +100,270 @@ func TestGetClusterConditionPredicate(t *testing.T) {
}
}
}
const (
retryInterval = 100 * time.Millisecond
clusters string = "clusters"
services string = "services"
endpoints string = "endpoints"
lbIngress1 = "10.20.30.40"
lbIngress2 = "10.20.30.50"
serviceEndpoint1 = "192.168.0.1"
serviceEndpoint2 = "192.168.1.1"
)
func TestServiceController(t *testing.T) {
glog.Infof("Creating fake infrastructure")
fedClient := &fakefedclientset.Clientset{}
cluster1 := NewClusterWithRegionZone("cluster1", v1.ConditionTrue, "region1", "zone1")
cluster2 := NewClusterWithRegionZone("cluster2", v1.ConditionTrue, "region2", "zone2")
RegisterFakeClusterGet(&fedClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}})
RegisterFakeList(clusters, &fedClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}})
fedclusterWatch := RegisterFakeWatch(clusters, &fedClient.Fake)
RegisterFakeList(services, &fedClient.Fake, &v1.ServiceList{Items: []v1.Service{}})
fedServiceWatch := RegisterFakeWatch(services, &fedClient.Fake)
RegisterFakeOnCreate(clusters, &fedClient.Fake, fedclusterWatch)
RegisterFakeOnUpdate(clusters, &fedClient.Fake, fedclusterWatch)
RegisterFakeOnCreate(services, &fedClient.Fake, fedServiceWatch)
RegisterFakeOnUpdate(services, &fedClient.Fake, fedServiceWatch)
cluster1Client := &fakekubeclientset.Clientset{}
RegisterFakeList(services, &cluster1Client.Fake, &v1.ServiceList{Items: []v1.Service{}})
c1ServiceWatch := RegisterFakeWatch(services, &cluster1Client.Fake)
RegisterFakeList(endpoints, &cluster1Client.Fake, &v1.EndpointsList{Items: []v1.Endpoints{}})
c1EndpointWatch := RegisterFakeWatch(endpoints, &cluster1Client.Fake)
RegisterFakeOnCreate(services, &cluster1Client.Fake, c1ServiceWatch)
RegisterFakeOnUpdate(services, &cluster1Client.Fake, c1ServiceWatch)
RegisterFakeOnCreate(endpoints, &cluster1Client.Fake, c1EndpointWatch)
RegisterFakeOnUpdate(endpoints, &cluster1Client.Fake, c1EndpointWatch)
cluster2Client := &fakekubeclientset.Clientset{}
RegisterFakeList(services, &cluster2Client.Fake, &v1.ServiceList{Items: []v1.Service{}})
c2ServiceWatch := RegisterFakeWatch(services, &cluster2Client.Fake)
RegisterFakeList(endpoints, &cluster2Client.Fake, &v1.EndpointsList{Items: []v1.Endpoints{}})
c2EndpointWatch := RegisterFakeWatch(endpoints, &cluster2Client.Fake)
RegisterFakeOnCreate(services, &cluster2Client.Fake, c2ServiceWatch)
RegisterFakeOnUpdate(services, &cluster2Client.Fake, c2ServiceWatch)
RegisterFakeOnCreate(endpoints, &cluster2Client.Fake, c2EndpointWatch)
RegisterFakeOnUpdate(endpoints, &cluster2Client.Fake, c2EndpointWatch)
fedInformerClientFactory := func(cluster *v1beta1.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
case cluster2.Name:
return cluster2Client, nil
default:
return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name)
}
}
fakedns, _ := clouddns.NewFakeInterface()
sc := New(fedClient, fakedns, "myfederation", "federation.example.com", "example.com", "")
ToFederatedInformerForTestOnly(sc.federatedInformer).SetClientFactory(fedInformerClientFactory)
ToFederatedInformerForTestOnly(sc.endpointFederatedInformer).SetClientFactory(fedInformerClientFactory)
sc.clusterAvailableDelay = 100 * time.Millisecond
sc.reviewDelay = 50 * time.Millisecond
sc.updateTimeout = 5 * time.Second
stop := make(chan struct{})
glog.Infof("Running Service Controller")
go sc.Run(5, stop)
glog.Infof("Adding cluster 1")
fedclusterWatch.Add(cluster1)
service := NewService("test-service-1", 80)
// Test add federated service.
glog.Infof("Adding federated service")
fedServiceWatch.Add(service)
key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}.String()
glog.Infof("Test service was correctly created in cluster 1")
require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name,
key, service, wait.ForeverTestTimeout))
glog.Infof("Adding cluster 2")
fedclusterWatch.Add(cluster2)
glog.Infof("Test service was correctly created in cluster 2")
require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster2.Name,
key, service, wait.ForeverTestTimeout))
glog.Infof("Test federation service is updated when cluster1 service status is updated")
service.Status = v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{
{IP: lbIngress1},
}}}
desiredStatus := service.Status
desiredService := &v1.Service{Status: desiredStatus}
c1ServiceWatch.Modify(service)
require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name,
key, service, wait.ForeverTestTimeout))
require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore,
key, desiredService, serviceStatusCompare, wait.ForeverTestTimeout))
glog.Infof("Test federation service is updated when cluster1 endpoint for the service is created")
desiredIngressAnnotation := NewFederatedServiceIngress().
AddEndpoints("cluster1", []string{lbIngress1}).
String()
desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}}
c1EndpointWatch.Add(NewEndpoint("test-service-1", serviceEndpoint1))
require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore,
key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout))
glog.Infof("Test federation service is updated when cluster2 service status is updated")
service.Status = v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{
{IP: lbIngress2},
}}}
desiredStatus.LoadBalancer.Ingress = append(desiredStatus.LoadBalancer.Ingress, v1.LoadBalancerIngress{IP: lbIngress2})
desiredService = &v1.Service{Status: desiredStatus}
c2ServiceWatch.Modify(service)
require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster2.Name,
key, service, wait.ForeverTestTimeout))
require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore,
key, desiredService, serviceStatusCompare, wait.ForeverTestTimeout))
glog.Infof("Test federation service is updated when cluster2 endpoint for the service is created")
desiredIngressAnnotation = NewFederatedServiceIngress().
AddEndpoints("cluster1", []string{lbIngress1}).
AddEndpoints("cluster2", []string{lbIngress2}).
String()
desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}}
c2EndpointWatch.Add(NewEndpoint("test-service-1", serviceEndpoint2))
require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore,
key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout))
glog.Infof("Test federation service is updated when cluster1 endpoint for the service is deleted")
desiredIngressAnnotation = NewFederatedServiceIngress().
AddEndpoints("cluster2", []string{lbIngress2}).
String()
desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}}
c1EndpointWatch.Delete(NewEndpoint("test-service-1", serviceEndpoint1))
require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore,
key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout))
// Test update federated service.
glog.Infof("Test modifying federated service by changing the port")
service.Spec.Ports[0].Port = 9090
fedServiceWatch.Modify(service)
require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name,
key, service, wait.ForeverTestTimeout))
// Test cluster service is recreated when deleted.
glog.Infof("Test cluster service is recreated when deleted")
c1ServiceWatch.Delete(service)
require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name,
key, service, wait.ForeverTestTimeout))
close(stop)
}
func NewService(name string, port int32) *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: v1.NamespaceDefault,
SelfLink: "/api/v1/namespaces/default/services/" + name,
Labels: map[string]string{"app": name},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{Port: port}},
Type: v1.ServiceTypeLoadBalancer,
},
}
}
func NewEndpoint(name, ip string) *v1.Endpoints {
return &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: v1.NamespaceDefault,
SelfLink: "/api/v1/namespaces/default/endpoints/" + name,
Labels: map[string]string{"app": name},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: ip,
}}},
},
}
}
// NewClusterWithRegionZone builds a new cluster object with given region and zone attributes.
func NewClusterWithRegionZone(name string, readyStatus v1.ConditionStatus, region, zone string) *v1beta1.Cluster {
cluster := NewCluster(name, readyStatus)
cluster.Status.Zones = []string{zone}
cluster.Status.Region = region
return cluster
}
// WaitForClusterService waits for the cluster service to be created matching the desiredService.
func WaitForClusterService(t *testing.T, store fedutil.FederatedReadOnlyStore, clusterName, key string, desiredService *v1.Service, timeout time.Duration) error {
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
obj, found, err := store.GetByKey(clusterName, key)
if !found || err != nil {
return false, err
}
service := obj.(*v1.Service)
if !Equivalent(service, desiredService) {
glog.V(5).Infof("Waiting for clustered service, Desired: %v, Current: %v", desiredService, service)
return false, nil
}
glog.V(5).Infof("Clustered service is up to date: %v", service)
return true, nil
})
return err
}
type serviceCompare func(current, desired *v1.Service) (match bool)
func serviceStatusCompare(current, desired *v1.Service) bool {
if !reflect.DeepEqual(current.Status.LoadBalancer, desired.Status.LoadBalancer) {
glog.V(5).Infof("Waiting for loadbalancer status, Current: %v, Desired: %v", current.Status.LoadBalancer, desired.Status.LoadBalancer)
return false
}
glog.V(5).Infof("Loadbalancer status match: %v", current.Status.LoadBalancer)
return true
}
func serviceIngressCompare(current, desired *v1.Service) bool {
if strings.Compare(current.Annotations[FederatedServiceIngressAnnotation], desired.Annotations[FederatedServiceIngressAnnotation]) != 0 {
glog.V(5).Infof("Waiting for loadbalancer ingress, Current: %v, Desired: %v", current.Annotations[FederatedServiceIngressAnnotation], desired.Annotations[FederatedServiceIngressAnnotation])
return false
}
glog.V(5).Infof("Loadbalancer ingress match: %v", current.Annotations[FederatedServiceIngressAnnotation])
return true
}
// WaitForFederatedServiceUpdate waits for federated service updates to match the desiredService.
func WaitForFederatedServiceUpdate(t *testing.T, store corelisters.ServiceLister, key string, desiredService *v1.Service, match serviceCompare, timeout time.Duration) error {
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
}
service, err := store.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
return false, nil
case err != nil:
return false, err
case !match(service, desiredService):
return false, nil
default:
return true, nil
}
})
return err
}

View File

@ -77,3 +77,35 @@ func NewTriggerOnMetaAndSpecChanges(triggerFunc func(pkgruntime.Object)) *cache.
},
}
}
// Returns cache.ResourceEventHandlerFuncs that trigger the given function
// on object add/delete or ObjectMeta or given field is updated.
func NewTriggerOnMetaAndFieldChanges(field string, triggerFunc func(pkgruntime.Object)) *cache.ResourceEventHandlerFuncs {
getFieldOrPanic := func(obj interface{}, fieldName string) interface{} {
val := reflect.ValueOf(obj).Elem().FieldByName(fieldName)
if val.IsValid() {
return val.Interface()
} else {
panic(fmt.Errorf("field not found: %s", fieldName))
}
}
return &cache.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) {
oldObj := old.(pkgruntime.Object)
triggerFunc(oldObj)
},
AddFunc: func(cur interface{}) {
curObj := cur.(pkgruntime.Object)
triggerFunc(curObj)
},
UpdateFunc: func(old, cur interface{}) {
curObj := cur.(pkgruntime.Object)
oldMeta := getFieldOrPanic(old, "ObjectMeta").(metav1.ObjectMeta)
curMeta := getFieldOrPanic(cur, "ObjectMeta").(metav1.ObjectMeta)
if !ObjectMetaEquivalent(oldMeta, curMeta) ||
!reflect.DeepEqual(getFieldOrPanic(old, field), getFieldOrPanic(cur, field)) {
triggerFunc(curObj)
}
},
}
}

View File

@ -181,6 +181,38 @@ func RegisterFakeList(resource string, client *core.Fake, obj runtime.Object) {
})
}
// RegisterFakeClusterGet registers a get response for the cluster resource inside the given fake client.
func RegisterFakeClusterGet(client *core.Fake, obj runtime.Object) {
clusterList, ok := obj.(*federationapi.ClusterList)
client.AddReactor("get", "clusters", func(action core.Action) (bool, runtime.Object, error) {
name := action.(core.GetAction).GetName()
if ok {
for _, cluster := range clusterList.Items {
if cluster.Name == name {
return true, &cluster, nil
}
}
}
return false, nil, fmt.Errorf("could not find the requested cluster: %s", name)
})
}
// RegisterFakeOnCreate registers a reactor in the given fake client that passes
// all created objects to the given watcher.
func RegisterFakeOnCreate(resource string, client *core.Fake, watcher *WatcherDispatcher) {
client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
originalObj := createAction.GetObject()
// Create a copy of the object here to prevent data races while reading the object in go routine.
obj := copy(originalObj)
watcher.orderExecution <- func() {
glog.V(4).Infof("Object created: %v", obj)
watcher.Add(obj)
}
return true, originalObj, nil
})
}
// RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes
// all created objects to the given watcher and also copies them to a channel for
// in-test inspection.
@ -201,6 +233,32 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch
return objChan
}
// RegisterFakeOnUpdate registers a reactor in the given fake client that passes
// all updated objects to the given watcher.
func RegisterFakeOnUpdate(resource string, client *core.Fake, watcher *WatcherDispatcher) {
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
updateAction := action.(core.UpdateAction)
originalObj := updateAction.GetObject()
glog.V(7).Infof("Updating %s: %v", resource, updateAction.GetObject())
// Create a copy of the object here to prevent data races while reading the object in go routine.
obj := copy(originalObj)
operation := func() {
glog.V(4).Infof("Object updated %v", obj)
watcher.Modify(obj)
}
select {
case watcher.orderExecution <- operation:
break
case <-time.After(pushTimeout):
glog.Errorf("Fake client execution channel blocked")
glog.Errorf("Tried to push %v", updateAction)
}
return true, originalObj, nil
})
return
}
// RegisterFakeCopyOnUpdate registers a reactor in the given fake client that passes
// all updated objects to the given watcher and also copies them to a channel for
// in-test inspection.
@ -313,6 +371,8 @@ func NewCluster(name string, readyStatus apiv1.ConditionStatus) *federationapi.C
Conditions: []federationapi.ClusterCondition{
{Type: federationapi.ClusterReady, Status: readyStatus},
},
Zones: []string{"foozone"},
Region: "fooregion",
},
}
}