From c355a2ac96b470b1ed57bc0204fb9a3b4d5f2a35 Mon Sep 17 00:00:00 2001 From: m1093782566 Date: Wed, 2 Aug 2017 00:09:37 +0800 Subject: [PATCH] Paramaterize stickyMaxAgeMinutes for service in API --- examples/meteor/meteor-service.json | 5 + pkg/api/types.go | 29 ++++++ pkg/api/validation/validation.go | 38 ++++++- pkg/api/validation/validation_test.go | 90 ++++++++++++++++ pkg/master/controller.go | 6 ++ pkg/master/controller_test.go | 120 +++++++++++++++++++--- pkg/proxy/iptables/proxier.go | 11 +- pkg/proxy/iptables/proxier_test.go | 4 +- pkg/proxy/userspace/loadbalancer.go | 2 +- pkg/proxy/userspace/proxier.go | 15 +-- pkg/proxy/userspace/roundrobin.go | 22 ++-- pkg/proxy/userspace/roundrobin_test.go | 14 +-- pkg/proxy/winuserspace/proxier.go | 7 +- pkg/proxy/winuserspace/roundrobin.go | 22 ++-- pkg/proxy/winuserspace/roundrobin_test.go | 14 +-- staging/src/k8s.io/api/core/v1/types.go | 19 ++++ 16 files changed, 349 insertions(+), 69 deletions(-) diff --git a/examples/meteor/meteor-service.json b/examples/meteor/meteor-service.json index 2dc55a041bb..2e494c0a2c1 100644 --- a/examples/meteor/meteor-service.json +++ b/examples/meteor/meteor-service.json @@ -15,6 +15,11 @@ "name": "meteor" }, "sessionAffinity": "ClientIP", + "sessionAffinityConfig": { + "clientIP": { + "timeoutSeconds": 90 + } + }, "type": "LoadBalancer" } } diff --git a/pkg/api/types.go b/pkg/api/types.go index a84ee8da130..7359453dafc 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -2660,6 +2660,31 @@ const ( ServiceAffinityNone ServiceAffinity = "None" ) +const ( + // DefaultClientIPServiceAffinitySeconds is the default timeout seconds + // of Client IP based session affinity - 3 hours. + DefaultClientIPServiceAffinitySeconds int32 = 10800 + // MaxClientIPServiceAffinitySeconds is the max timeout seconds + // of Client IP based session affinity - 1 day. + MaxClientIPServiceAffinitySeconds int32 = 86400 +) + +// SessionAffinityConfig represents the configurations of session affinity. +type SessionAffinityConfig struct { + // clientIP contains the configurations of Client IP based session affinity. + // +optional + ClientIP *ClientIPConfig +} + +// ClientIPConfig represents the configurations of Client IP based session affinity. +type ClientIPConfig struct { + // timeoutSeconds specifies the seconds of ClientIP type session sticky time. + // The value must be >0 && <=86400(for 1 day) if ServiceAffinity == "ClientIP". + // Default value is 10800(for 3 hours). + // +optional + TimeoutSeconds *int32 +} + // Service Type string describes ingress methods for a service type ServiceType string @@ -2787,6 +2812,10 @@ type ServiceSpec struct { // +optional SessionAffinity ServiceAffinity + // sessionAffinityConfig contains the configurations of session affinity. + // +optional + SessionAffinityConfig *SessionAffinityConfig + // Optional: If specified and supported by the platform, this will restrict traffic through the cloud-provider // load-balancer will be restricted to the specified client IPs. This field will be ignored if the // cloud-provider does not support the feature." diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 73301784c01..9544388908b 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -19,6 +19,7 @@ package validation import ( "encoding/json" "fmt" + "math" "net" "path" "path/filepath" @@ -28,8 +29,6 @@ import ( "github.com/golang/glog" - "math" - "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" @@ -1873,6 +1872,33 @@ func validateProbe(probe *api.Probe, fldPath *field.Path) field.ErrorList { return allErrs } +func validateClientIPAffinityConfig(config *api.SessionAffinityConfig, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if config == nil { + allErrs = append(allErrs, field.Required(fldPath, fmt.Sprintf("when session affinity type is %s", api.ServiceAffinityClientIP))) + return allErrs + } + if config.ClientIP == nil { + allErrs = append(allErrs, field.Required(fldPath.Child("clientIP"), fmt.Sprintf("when session affinity type is %s", api.ServiceAffinityClientIP))) + return allErrs + } + if config.ClientIP.TimeoutSeconds == nil { + allErrs = append(allErrs, field.Required(fldPath.Child("clientIP").Child("timeoutSeconds"), fmt.Sprintf("when session affinity type is %s", api.ServiceAffinityClientIP))) + return allErrs + } + allErrs = append(allErrs, validateAffinityTimeout(config.ClientIP.TimeoutSeconds, fldPath.Child("clientIP").Child("timeoutSeconds"))...) + + return allErrs +} + +func validateAffinityTimeout(timeout *int32, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if *timeout <= 0 || *timeout > api.MaxClientIPServiceAffinitySeconds { + allErrs = append(allErrs, field.Invalid(fldPath, timeout, fmt.Sprintf("must be greater than 0 and less than %d", api.MaxClientIPServiceAffinitySeconds))) + } + return allErrs +} + // AccumulateUniqueHostPorts extracts each HostPort of each Container, // accumulating the results and returning an error if any ports conflict. func AccumulateUniqueHostPorts(containers []api.Container, accumulator *sets.String, fldPath *field.Path) field.ErrorList { @@ -2914,6 +2940,14 @@ func ValidateService(service *api.Service) field.ErrorList { allErrs = append(allErrs, field.NotSupported(specPath.Child("sessionAffinity"), service.Spec.SessionAffinity, supportedSessionAffinityType.List())) } + if service.Spec.SessionAffinity == api.ServiceAffinityClientIP { + allErrs = append(allErrs, validateClientIPAffinityConfig(service.Spec.SessionAffinityConfig, specPath.Child("sessionAffinityConfig"))...) + } else if service.Spec.SessionAffinity == api.ServiceAffinityNone { + if service.Spec.SessionAffinityConfig != nil { + allErrs = append(allErrs, field.Forbidden(specPath.Child("sessionAffinityConfig"), fmt.Sprintf("must not be set when session affinity is %s", string(api.ServiceAffinityNone)))) + } + } + if helper.IsServiceIPSet(service) { if ip := net.ParseIP(service.Spec.ClusterIP); ip == nil { allErrs = append(allErrs, field.Invalid(specPath.Child("clusterIP"), service.Spec.ClusterIP, "must be empty, 'None', or a valid IP address")) diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 50e36fe6b6a..bdd514ceb07 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -6792,6 +6792,32 @@ func TestValidateService(t *testing.T) { numErrs: 0, }, // ESIPP section ends. + { + name: "invalid timeoutSeconds field", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeClusterIP + s.Spec.SessionAffinity = api.ServiceAffinityClientIP + s.Spec.SessionAffinityConfig = &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(-1), + }, + } + }, + numErrs: 1, + }, + { + name: "sessionAffinityConfig can't be set when session affinity is None", + tweakSvc: func(s *api.Service) { + s.Spec.Type = api.ServiceTypeLoadBalancer + s.Spec.SessionAffinity = api.ServiceAffinityNone + s.Spec.SessionAffinityConfig = &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(90), + }, + } + }, + numErrs: 1, + }, } for _, tc := range testCases { @@ -8193,6 +8219,11 @@ func TestValidateServiceUpdate(t *testing.T) { name: "change affinity", tweakSvc: func(oldSvc, newSvc *api.Service) { newSvc.Spec.SessionAffinity = "ClientIP" + newSvc.Spec.SessionAffinityConfig = &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(90), + }, + } }, numErrs: 0, }, @@ -10314,3 +10345,62 @@ func TestValidateFlexVolumeSource(t *testing.T) { } } } + +func TestValidateOrSetClientIPAffinityConfig(t *testing.T) { + successCases := map[string]*api.SessionAffinityConfig{ + "non-empty config, valid timeout: 1": { + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(1), + }, + }, + "non-empty config, valid timeout: api.MaxClientIPServiceAffinitySeconds-1": { + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(int(api.MaxClientIPServiceAffinitySeconds - 1)), + }, + }, + "non-empty config, valid timeout: api.MaxClientIPServiceAffinitySeconds": { + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(int(api.MaxClientIPServiceAffinitySeconds)), + }, + }, + } + + for name, test := range successCases { + if errs := validateClientIPAffinityConfig(test, field.NewPath("field")); len(errs) != 0 { + t.Errorf("case: %s, expected success: %v", name, errs) + } + } + + errorCases := map[string]*api.SessionAffinityConfig{ + "empty session affinity config": nil, + "empty client IP config": { + ClientIP: nil, + }, + "empty timeoutSeconds": { + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: nil, + }, + }, + "non-empty config, invalid timeout: api.MaxClientIPServiceAffinitySeconds+1": { + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(int(api.MaxClientIPServiceAffinitySeconds + 1)), + }, + }, + "non-empty config, invalid timeout: -1": { + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(-1), + }, + }, + "non-empty config, invalid timeout: 0": { + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: newInt32(0), + }, + }, + } + + for name, test := range errorCases { + if errs := validateClientIPAffinityConfig(test, field.NewPath("field")); len(errs) == 0 { + t.Errorf("case: %v, expected failures: %v", name, errs) + } + } +} diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 0e922e17b82..e38718c6167 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -250,6 +250,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser } return nil } + timeoutSeconds := api.DefaultClientIPServiceAffinitySeconds svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, @@ -263,6 +264,11 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser ClusterIP: serviceIP.String(), SessionAffinity: api.ServiceAffinityClientIP, Type: serviceType, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, }, } diff --git a/pkg/master/controller_test.go b/pkg/master/controller_test.go index 50dc1bab734..f0011223347 100644 --- a/pkg/master/controller_test.go +++ b/pkg/master/controller_test.go @@ -546,6 +546,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { om := func(name string) metav1.ObjectMeta { return metav1.ObjectMeta{Namespace: ns, Name: name} } + timeoutSeconds := api.DefaultClientIPServiceAffinitySeconds create_tests := []struct { testName string @@ -570,7 +571,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, }, @@ -625,7 +631,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, expectUpdate: &api.Service{ @@ -637,7 +648,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, }, @@ -658,7 +674,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, expectUpdate: &api.Service{ @@ -671,7 +692,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, }, @@ -691,7 +717,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, expectUpdate: &api.Service{ @@ -703,7 +734,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, }, @@ -723,7 +759,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, expectUpdate: &api.Service{ @@ -735,7 +776,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, }, @@ -755,7 +801,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, expectUpdate: &api.Service{ @@ -767,7 +818,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, }, @@ -787,7 +843,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, expectUpdate: &api.Service{ @@ -799,7 +860,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, }, @@ -819,7 +885,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeNodePort, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeNodePort, }, }, expectUpdate: &api.Service{ @@ -831,7 +902,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, }, @@ -851,7 +927,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, expectUpdate: nil, @@ -910,7 +991,12 @@ func TestCreateOrUpdateMasterService(t *testing.T) { Selector: nil, ClusterIP: "1.2.3.4", SessionAffinity: api.ServiceAffinityClientIP, - Type: api.ServiceTypeClusterIP, + SessionAffinityConfig: &api.SessionAffinityConfig{ + ClientIP: &api.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + }, + Type: api.ServiceTypeClusterIP, }, }, expectUpdate: nil, diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 6f2e6668427..4479a2e1128 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -143,7 +143,7 @@ type serviceInfo struct { nodePort int loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity - stickyMaxAgeMinutes int + stickyMaxAgeSeconds int externalIPs []string loadBalancerSourceRanges []string onlyNodeLocalEndpoints bool @@ -194,6 +194,10 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se apiservice.RequestsOnlyLocalTraffic(service) { onlyNodeLocalEndpoints = true } + var stickyMaxAgeSeconds int + if service.Spec.SessionAffinity == api.ServiceAffinityClientIP { + stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) + } info := &serviceInfo{ clusterIP: net.ParseIP(service.Spec.ClusterIP), port: int(port.Port), @@ -202,11 +206,12 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se // Deep-copy in case the service instance changes loadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer), sessionAffinityType: service.Spec.SessionAffinity, - stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. + stickyMaxAgeSeconds: stickyMaxAgeSeconds, externalIPs: make([]string, len(service.Spec.ExternalIPs)), loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, } + copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) copy(info.externalIPs, service.Spec.ExternalIPs) @@ -1440,7 +1445,7 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(svcChain), "-m", "comment", "--comment", svcNameString, "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeMinutes*60), "--reap", + "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeSeconds), "--reap", "-j", string(endpointChain)) } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 35f288c53b3..311fbd88ab1 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -178,8 +178,8 @@ func TestGetChainLinesMultipleTables(t *testing.T) { func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo { return &serviceInfo{ - sessionAffinityType: api.ServiceAffinityNone, // default - stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. + sessionAffinityType: api.ServiceAffinityNone, // default + stickyMaxAgeSeconds: int(api.DefaultClientIPServiceAffinitySeconds), // default clusterIP: ip, port: port, protocol: protocol, diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go index 2bf23d5b940..369094e3296 100644 --- a/pkg/proxy/userspace/loadbalancer.go +++ b/pkg/proxy/userspace/loadbalancer.go @@ -28,7 +28,7 @@ type LoadBalancer interface { // NextEndpoint returns the endpoint to handle a request for the given // service-port and source address. NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) - NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error + NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeSeconds int) error DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) ServiceHasEndpoints(service proxy.ServicePortName) bool diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 7d3502ce79d..d299544da2e 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -61,7 +61,7 @@ type ServiceInfo struct { nodePort int loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity - stickyMaxAgeMinutes int + stickyMaxAgeSeconds int // Deprecated, but required for back-compat (including e2e) externalIPs []string } @@ -378,15 +378,13 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol return nil, err } si := &ServiceInfo{ - Timeout: timeout, - ActiveClients: newClientCache(), - + Timeout: timeout, + ActiveClients: newClientCache(), isAliveAtomic: 1, proxyPort: portNum, protocol: protocol, socket: sock, sessionAffinityType: api.ServiceAffinityNone, // default - stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. } proxier.setServiceInfo(service, si) @@ -450,12 +448,17 @@ func (proxier *Proxier) mergeService(service *api.Service) sets.String { info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) info.nodePort = int(servicePort.NodePort) info.sessionAffinityType = service.Spec.SessionAffinity + // Set session affinity timeout value when sessionAffinity==ClientIP + if service.Spec.SessionAffinity == api.ServiceAffinityClientIP { + info.stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) + } + glog.V(4).Infof("info: %#v", info) if err := proxier.openPortal(serviceName, info); err != nil { glog.Errorf("Failed to open portal for %q: %v", serviceName, err) } - proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) + proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeSeconds) } return existingPorts diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index b610ebbbd76..b32d69d8776 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -48,7 +48,7 @@ type affinityState struct { type affinityPolicy struct { affinityType api.ServiceAffinity affinityMap map[string]*affinityState // map client IP -> affinity info - ttlMinutes int + ttlSeconds int } // LoadBalancerRR is a round-robin load balancer. @@ -66,11 +66,11 @@ type balancerState struct { affinity affinityPolicy } -func newAffinityPolicy(affinityType api.ServiceAffinity, ttlMinutes int) *affinityPolicy { +func newAffinityPolicy(affinityType api.ServiceAffinity, ttlSeconds int) *affinityPolicy { return &affinityPolicy{ affinityType: affinityType, affinityMap: make(map[string]*affinityState), - ttlMinutes: ttlMinutes, + ttlSeconds: ttlSeconds, } } @@ -81,22 +81,22 @@ func NewLoadBalancerRR() *LoadBalancerRR { } } -func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error { +func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlSeconds int) error { glog.V(4).Infof("LoadBalancerRR NewService %q", svcPort) lb.lock.Lock() defer lb.lock.Unlock() - lb.newServiceInternal(svcPort, affinityType, ttlMinutes) + lb.newServiceInternal(svcPort, affinityType, ttlSeconds) return nil } // This assumes that lb.lock is already held. -func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) *balancerState { - if ttlMinutes == 0 { - ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimited instead???? +func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlSeconds int) *balancerState { + if ttlSeconds == 0 { + ttlSeconds = int(api.DefaultClientIPServiceAffinitySeconds) //default to 3 hours if not specified. Should 0 be unlimited instead???? } if _, exists := lb.services[svcPort]; !exists { - lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} + lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)} glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort) } else if affinityType != "" { lb.services[svcPort].affinity.affinityType = affinityType @@ -159,7 +159,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne } if !sessionAffinityReset { sessionAffinity, exists := state.affinity.affinityMap[ipaddr] - if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Seconds()) < state.affinity.ttlSeconds { // Affinity wins. endpoint := sessionAffinity.endpoint sessionAffinity.lastUsed = time.Now() @@ -378,7 +378,7 @@ func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortNa return } for ip, affinity := range state.affinity.affinityMap { - if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes { + if int(time.Now().Sub(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds { glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort) delete(state.affinity.affinityMap, ip) } diff --git a/pkg/proxy/userspace/roundrobin_test.go b/pkg/proxy/userspace/roundrobin_test.go index de3a64de77b..05ccbd37a81 100644 --- a/pkg/proxy/userspace/roundrobin_test.go +++ b/pkg/proxy/userspace/roundrobin_test.go @@ -357,7 +357,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { } // Call NewService() before OnEndpointsUpdate() - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ @@ -421,7 +421,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { }, } loadBalancer.OnEndpointsAdd(endpoints) - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} @@ -473,7 +473,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ @@ -546,7 +546,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ @@ -605,7 +605,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Subsets: []api.EndpointSubset{ @@ -616,7 +616,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""} - loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(barService, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Subsets: []api.EndpointSubset{ @@ -674,7 +674,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { } // Call NewService() before OnEndpointsUpdate() - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 03fd9546c45..4e6382918c6 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -36,7 +36,6 @@ import ( ) const allAvailableInterfaces string = "" -const stickyMaxAgeMinutes int = 180 // TODO: parameterize this in the API. type portal struct { ip string @@ -360,7 +359,11 @@ func (proxier *Proxier) mergeService(service *api.Service) map[ServicePortPortal }, Port: servicePort.Name, } - proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes) + timeoutSeconds := 0 + if service.Spec.SessionAffinity == api.ServiceAffinityClientIP { + timeoutSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) + } + proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, timeoutSeconds) } } diff --git a/pkg/proxy/winuserspace/roundrobin.go b/pkg/proxy/winuserspace/roundrobin.go index ff2026bd8fc..86d7614c850 100644 --- a/pkg/proxy/winuserspace/roundrobin.go +++ b/pkg/proxy/winuserspace/roundrobin.go @@ -48,7 +48,7 @@ type affinityState struct { type affinityPolicy struct { affinityType api.ServiceAffinity affinityMap map[string]*affinityState // map client IP -> affinity info - ttlMinutes int + ttlSeconds int } // LoadBalancerRR is a round-robin load balancer. @@ -66,11 +66,11 @@ type balancerState struct { affinity affinityPolicy } -func newAffinityPolicy(affinityType api.ServiceAffinity, ttlMinutes int) *affinityPolicy { +func newAffinityPolicy(affinityType api.ServiceAffinity, ttlSeconds int) *affinityPolicy { return &affinityPolicy{ affinityType: affinityType, affinityMap: make(map[string]*affinityState), - ttlMinutes: ttlMinutes, + ttlSeconds: ttlSeconds, } } @@ -81,22 +81,22 @@ func NewLoadBalancerRR() *LoadBalancerRR { } } -func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error { +func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlSeconds int) error { glog.V(4).Infof("LoadBalancerRR NewService %q", svcPort) lb.lock.Lock() defer lb.lock.Unlock() - lb.newServiceInternal(svcPort, affinityType, ttlMinutes) + lb.newServiceInternal(svcPort, affinityType, ttlSeconds) return nil } // This assumes that lb.lock is already held. -func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) *balancerState { - if ttlMinutes == 0 { - ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimited instead???? +func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlSeconds int) *balancerState { + if ttlSeconds == 0 { + ttlSeconds = int(api.DefaultClientIPServiceAffinitySeconds) //default to 3 hours if not specified. Should 0 be unlimited instead???? } if _, exists := lb.services[svcPort]; !exists { - lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} + lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)} glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort) } else if affinityType != "" { lb.services[svcPort].affinity.affinityType = affinityType @@ -149,7 +149,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne } if !sessionAffinityReset { sessionAffinity, exists := state.affinity.affinityMap[ipaddr] - if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Seconds()) < state.affinity.ttlSeconds { // Affinity wins. endpoint := sessionAffinity.endpoint sessionAffinity.lastUsed = time.Now() @@ -366,7 +366,7 @@ func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortNa return } for ip, affinity := range state.affinity.affinityMap { - if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes { + if int(time.Now().Sub(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds { glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort) delete(state.affinity.affinityMap, ip) } diff --git a/pkg/proxy/winuserspace/roundrobin_test.go b/pkg/proxy/winuserspace/roundrobin_test.go index b334a3ccc24..1f4973ee1a9 100644 --- a/pkg/proxy/winuserspace/roundrobin_test.go +++ b/pkg/proxy/winuserspace/roundrobin_test.go @@ -357,7 +357,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { } // Call NewService() before OnEndpointsUpdate() - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ @@ -421,7 +421,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { }, } loadBalancer.OnEndpointsAdd(endpoints) - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} @@ -473,7 +473,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ @@ -546,7 +546,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ @@ -605,7 +605,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Subsets: []api.EndpointSubset{ @@ -616,7 +616,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""} - loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(barService, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Subsets: []api.EndpointSubset{ @@ -674,7 +674,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { } // Call NewService() before OnEndpointsUpdate() - loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, int(api.DefaultClientIPServiceAffinitySeconds)) endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index c3de989d358..e5e58c0d84f 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -2998,6 +2998,22 @@ const ( ServiceAffinityNone ServiceAffinity = "None" ) +// SessionAffinityConfig represents the configurations of session affinity. +type SessionAffinityConfig struct { + // clientIP contains the configurations of Client IP based session affinity. + // +optional + ClientIP *ClientIPConfig `json:"clientIP,omitempty" protobuf:"bytes,1,opt,name=clientIP"` +} + +// ClientIPConfig represents the configurations of Client IP based session affinity. +type ClientIPConfig struct { + // timeoutSeconds specifies the seconds of ClientIP type session sticky time. + // The value must be >0 && <=86400(for 1 day) if ServiceAffinity == "ClientIP". + // Default value is 10800(for 3 hours). + // +optional + TimeoutSeconds *int32 `json:"timeoutSeconds,omitempty" protobuf:"varint,1,opt,name=timeoutSeconds"` +} + // Service Type string describes ingress methods for a service type ServiceType string @@ -3172,6 +3188,9 @@ type ServiceSpec struct { // field. // +optional PublishNotReadyAddresses bool `json:"publishNotReadyAddresses,omitempty" protobuf:"varint,13,opt,name=publishNotReadyAddresses"` + // sessionAffinityConfig contains the configurations of session affinity. + // +optional + SessionAffinityConfig *SessionAffinityConfig `json:"sessionAffinityConfig,omitempty" protobuf:"bytes,14,opt,name=sessionAffinityConfig"` } // ServicePort contains information on service's port.