[kube-proxy] Move Service/EndpointInfo common codes to change tracker

This commit is contained in:
Zihong Zheng
2018-02-16 19:09:33 -08:00
parent 249ecab74e
commit b485f7b5b4
9 changed files with 701 additions and 844 deletions

View File

@@ -17,6 +17,8 @@ limitations under the License.
package proxy
import (
"fmt"
"net"
"reflect"
"sync"
@@ -24,10 +26,91 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
apiservice "k8s.io/kubernetes/pkg/api/service"
api "k8s.io/kubernetes/pkg/apis/core"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/apis/core/helper"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
)
// ServiceInfoCommon contains common service information.
type ServiceInfoCommon struct {
ClusterIP net.IP
Port int
Protocol api.Protocol
NodePort int
LoadBalancerStatus api.LoadBalancerStatus
SessionAffinityType api.ServiceAffinity
StickyMaxAgeSeconds int
ExternalIPs []string
LoadBalancerSourceRanges []string
HealthCheckNodePort int
OnlyNodeLocalEndpoints bool
}
var _ ServicePort = &ServiceInfoCommon{}
// String is part of ServicePort interface.
func (info *ServiceInfoCommon) String() string {
return fmt.Sprintf("%s:%d/%s", info.ClusterIP, info.Port, info.Protocol)
}
// GetClusterIP is part of ServicePort interface.
func (info *ServiceInfoCommon) GetClusterIP() string {
return info.ClusterIP.String()
}
// GetProtocol is part of ServicePort interface.
func (info *ServiceInfoCommon) GetProtocol() api.Protocol {
return info.Protocol
}
// GetHealthCheckNodePort is part of ServicePort interface.
func (info *ServiceInfoCommon) GetHealthCheckNodePort() int {
return info.HealthCheckNodePort
}
func (sct *ServiceChangeTracker) newServiceInfoCommon(port *api.ServicePort, service *api.Service) *ServiceInfoCommon {
onlyNodeLocalEndpoints := false
if apiservice.RequestsOnlyLocalTraffic(service) {
onlyNodeLocalEndpoints = true
}
var stickyMaxAgeSeconds int
if service.Spec.SessionAffinity == api.ServiceAffinityClientIP {
// Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP
stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
}
info := &ServiceInfoCommon{
ClusterIP: net.ParseIP(service.Spec.ClusterIP),
Port: int(port.Port),
Protocol: port.Protocol,
NodePort: int(port.NodePort),
// Deep-copy in case the service instance changes
LoadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer),
SessionAffinityType: service.Spec.SessionAffinity,
StickyMaxAgeSeconds: stickyMaxAgeSeconds,
OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
}
info.ExternalIPs = make([]string, len(service.Spec.ExternalIPs))
info.LoadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges))
copy(info.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
copy(info.ExternalIPs, service.Spec.ExternalIPs)
if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort
if p == 0 {
glog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name)
} else {
info.HealthCheckNodePort = int(p)
}
}
return info
}
type customizeServiceInfoFunc func(*api.ServicePort, *api.Service, *ServiceInfoCommon) ServicePort
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
@@ -43,12 +126,20 @@ type ServiceChangeTracker struct {
lock sync.Mutex
// items maps a service to its serviceChange.
items map[types.NamespacedName]*serviceChange
// customizeServiceInfo allows proxier to inject customized infomation when processing service.
customizeServiceInfo customizeServiceInfoFunc
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
isIPv6Mode *bool
recorder record.EventRecorder
}
// NewServiceChangeTracker initializes a ServiceChangeTracker
func NewServiceChangeTracker() *ServiceChangeTracker {
func NewServiceChangeTracker(customizeServiceInfo customizeServiceInfoFunc, isIPv6Mode *bool, recorder record.EventRecorder) *ServiceChangeTracker {
return &ServiceChangeTracker{
items: make(map[types.NamespacedName]*serviceChange),
items: make(map[types.NamespacedName]*serviceChange),
customizeServiceInfo: customizeServiceInfo,
isIPv6Mode: isIPv6Mode,
recorder: recorder,
}
}
@@ -60,10 +151,7 @@ func NewServiceChangeTracker() *ServiceChangeTracker {
// - pass <oldService, service> as the <previous, current> pair.
// Delete item
// - pass <service, nil> as the <previous, current> pair.
//
// makeServicePort() return a proxy.ServicePort based on the given Service and its ServicePort. We inject makeServicePort()
// so that giving caller side a chance to initialize proxy.ServicePort interface.
func (sct *ServiceChangeTracker) Update(previous, current *api.Service, makeServicePort func(servicePort *api.ServicePort, service *api.Service) ServicePort) bool {
func (sct *ServiceChangeTracker) Update(previous, current *api.Service) bool {
svc := current
if svc == nil {
svc = previous
@@ -80,10 +168,10 @@ func (sct *ServiceChangeTracker) Update(previous, current *api.Service, makeServ
change, exists := sct.items[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = serviceToServiceMap(previous, makeServicePort)
change.previous = sct.serviceToServiceMap(previous)
sct.items[namespacedName] = change
}
change.current = serviceToServiceMap(current, makeServicePort)
change.current = sct.serviceToServiceMap(current)
// if change.previous equal to change.current, it means no change
if reflect.DeepEqual(change.previous, change.current) {
delete(sct.items, namespacedName)
@@ -110,28 +198,26 @@ func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (res
// computing this incrementally similarly to serviceMap.
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap {
if info.HealthCheckNodePort() != 0 {
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
if info.GetHealthCheckNodePort() != 0 {
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort())
}
}
return result
}
// ServiceMap maps a service to its ServicePort information.
// ServiceMap maps a service to its ServicePort.
type ServiceMap map[ServicePortName]ServicePort
// serviceToServiceMap translates a single Service object to a ServiceMap.
// makeServicePort() return a proxy.ServicePort based on the given Service and its ServicePort. We inject makeServicePort()
// so that giving caller side a chance to initialize proxy.ServicePort interface.
//
// NOTE: service object should NOT be modified.
func serviceToServiceMap(service *api.Service, makeServicePort func(servicePort *api.ServicePort, service *api.Service) ServicePort) ServiceMap {
func (sct *ServiceChangeTracker) serviceToServiceMap(service *api.Service) ServiceMap {
if service == nil {
return nil
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxyutil.ShouldSkipService(svcName, service) {
if utilproxy.ShouldSkipService(svcName, service) {
return nil
}
@@ -139,7 +225,12 @@ func serviceToServiceMap(service *api.Service, makeServicePort func(servicePort
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
serviceMap[svcPortName] = makeServicePort(servicePort, service)
svcInfoCommon := sct.newServiceInfoCommon(servicePort, service)
if sct.customizeServiceInfo != nil {
serviceMap[svcPortName] = sct.customizeServiceInfo(servicePort, service, svcInfoCommon)
} else {
serviceMap[svcPortName] = svcInfoCommon
}
}
return serviceMap
}
@@ -213,8 +304,8 @@ func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
info, exists := (*sm)[svcPortName]
if exists {
glog.V(1).Infof("Removing service port %q", svcPortName)
if info.Protocol() == api.ProtocolUDP {
UDPStaleClusterIP.Insert(info.ClusterIP())
if info.GetProtocol() == api.ProtocolUDP {
UDPStaleClusterIP.Insert(info.GetClusterIP())
}
delete(*sm, svcPortName)
} else {