mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			344 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			344 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2017 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package proxy
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"net"
 | 
						|
	"reflect"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	"k8s.io/client-go/tools/record"
 | 
						|
	apiservice "k8s.io/kubernetes/pkg/api/service"
 | 
						|
	api "k8s.io/kubernetes/pkg/apis/core"
 | 
						|
	utilproxy "k8s.io/kubernetes/pkg/proxy/util"
 | 
						|
	utilnet "k8s.io/kubernetes/pkg/util/net"
 | 
						|
)
 | 
						|
 | 
						|
// BaseServiceInfo contains base information that defines a service.
 | 
						|
// This could be used directly by proxier while processing services,
 | 
						|
// or can be used for constructing a more specific ServiceInfo struct
 | 
						|
// defined by the proxier if needed.
 | 
						|
type BaseServiceInfo struct {
 | 
						|
	ClusterIP                net.IP
 | 
						|
	Port                     int
 | 
						|
	Protocol                 api.Protocol
 | 
						|
	NodePort                 int
 | 
						|
	LoadBalancerStatus       api.LoadBalancerStatus
 | 
						|
	SessionAffinityType      api.ServiceAffinity
 | 
						|
	StickyMaxAgeSeconds      int
 | 
						|
	ExternalIPs              []string
 | 
						|
	LoadBalancerSourceRanges []string
 | 
						|
	HealthCheckNodePort      int
 | 
						|
	OnlyNodeLocalEndpoints   bool
 | 
						|
}
 | 
						|
 | 
						|
var _ ServicePort = &BaseServiceInfo{}
 | 
						|
 | 
						|
// String is part of ServicePort interface.
 | 
						|
func (info *BaseServiceInfo) String() string {
 | 
						|
	return fmt.Sprintf("%s:%d/%s", info.ClusterIP, info.Port, info.Protocol)
 | 
						|
}
 | 
						|
 | 
						|
// ClusterIPString is part of ServicePort interface.
 | 
						|
func (info *BaseServiceInfo) ClusterIPString() string {
 | 
						|
	return info.ClusterIP.String()
 | 
						|
}
 | 
						|
 | 
						|
// GetProtocol is part of ServicePort interface.
 | 
						|
func (info *BaseServiceInfo) GetProtocol() api.Protocol {
 | 
						|
	return info.Protocol
 | 
						|
}
 | 
						|
 | 
						|
// GetHealthCheckNodePort is part of ServicePort interface.
 | 
						|
func (info *BaseServiceInfo) GetHealthCheckNodePort() int {
 | 
						|
	return info.HealthCheckNodePort
 | 
						|
}
 | 
						|
 | 
						|
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *api.ServicePort, service *api.Service) *BaseServiceInfo {
 | 
						|
	onlyNodeLocalEndpoints := false
 | 
						|
	if apiservice.RequestsOnlyLocalTraffic(service) {
 | 
						|
		onlyNodeLocalEndpoints = true
 | 
						|
	}
 | 
						|
	var stickyMaxAgeSeconds int
 | 
						|
	if service.Spec.SessionAffinity == api.ServiceAffinityClientIP {
 | 
						|
		// Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP
 | 
						|
		stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
 | 
						|
	}
 | 
						|
	info := &BaseServiceInfo{
 | 
						|
		ClusterIP: net.ParseIP(service.Spec.ClusterIP),
 | 
						|
		Port:      int(port.Port),
 | 
						|
		Protocol:  port.Protocol,
 | 
						|
		NodePort:  int(port.NodePort),
 | 
						|
		// Deep-copy in case the service instance changes
 | 
						|
		LoadBalancerStatus:     *service.Status.LoadBalancer.DeepCopy(),
 | 
						|
		SessionAffinityType:    service.Spec.SessionAffinity,
 | 
						|
		StickyMaxAgeSeconds:    stickyMaxAgeSeconds,
 | 
						|
		OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
 | 
						|
	}
 | 
						|
 | 
						|
	if sct.isIPv6Mode == nil {
 | 
						|
		info.ExternalIPs = make([]string, len(service.Spec.ExternalIPs))
 | 
						|
		info.LoadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges))
 | 
						|
		copy(info.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
 | 
						|
		copy(info.ExternalIPs, service.Spec.ExternalIPs)
 | 
						|
	} else {
 | 
						|
		// Filter out the incorrect IP version case.
 | 
						|
		// If ExternalIPs and LoadBalancerSourceRanges on service contains incorrect IP versions,
 | 
						|
		// only filter out the incorrect ones.
 | 
						|
		var incorrectIPs []string
 | 
						|
		info.ExternalIPs, incorrectIPs = utilnet.FilterIncorrectIPVersion(service.Spec.ExternalIPs, *sct.isIPv6Mode)
 | 
						|
		if len(incorrectIPs) > 0 {
 | 
						|
			utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "externalIPs", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID)
 | 
						|
		}
 | 
						|
		info.LoadBalancerSourceRanges, incorrectIPs = utilnet.FilterIncorrectCIDRVersion(service.Spec.LoadBalancerSourceRanges, *sct.isIPv6Mode)
 | 
						|
		if len(incorrectIPs) > 0 {
 | 
						|
			utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "loadBalancerSourceRanges", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if apiservice.NeedsHealthCheck(service) {
 | 
						|
		p := service.Spec.HealthCheckNodePort
 | 
						|
		if p == 0 {
 | 
						|
			glog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name)
 | 
						|
		} else {
 | 
						|
			info.HealthCheckNodePort = int(p)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return info
 | 
						|
}
 | 
						|
 | 
						|
type makeServicePortFunc func(*api.ServicePort, *api.Service, *BaseServiceInfo) ServicePort
 | 
						|
 | 
						|
// serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,
 | 
						|
// changes are accumulated, i.e. previous is state from before applying the changes,
 | 
						|
// current is state after applying all of the changes.
 | 
						|
type serviceChange struct {
 | 
						|
	previous ServiceMap
 | 
						|
	current  ServiceMap
 | 
						|
}
 | 
						|
 | 
						|
// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
 | 
						|
// Services, keyed by their namespace and name.
 | 
						|
type ServiceChangeTracker struct {
 | 
						|
	// lock protects items.
 | 
						|
	lock sync.Mutex
 | 
						|
	// items maps a service to its serviceChange.
 | 
						|
	items map[types.NamespacedName]*serviceChange
 | 
						|
	// makeServiceInfo allows proxier to inject customized information when processing service.
 | 
						|
	makeServiceInfo makeServicePortFunc
 | 
						|
	// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
 | 
						|
	isIPv6Mode *bool
 | 
						|
	recorder   record.EventRecorder
 | 
						|
}
 | 
						|
 | 
						|
// NewServiceChangeTracker initializes a ServiceChangeTracker
 | 
						|
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder) *ServiceChangeTracker {
 | 
						|
	return &ServiceChangeTracker{
 | 
						|
		items:           make(map[types.NamespacedName]*serviceChange),
 | 
						|
		makeServiceInfo: makeServiceInfo,
 | 
						|
		isIPv6Mode:      isIPv6Mode,
 | 
						|
		recorder:        recorder,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Update updates given service's change map based on the <previous, current> service pair.  It returns true if items changed,
 | 
						|
// otherwise return false.  Update can be used to add/update/delete items of ServiceChangeMap.  For example,
 | 
						|
// Add item
 | 
						|
//   - pass <nil, service> as the <previous, current> pair.
 | 
						|
// Update item
 | 
						|
//   - pass <oldService, service> as the <previous, current> pair.
 | 
						|
// Delete item
 | 
						|
//   - pass <service, nil> as the <previous, current> pair.
 | 
						|
func (sct *ServiceChangeTracker) Update(previous, current *api.Service) bool {
 | 
						|
	svc := current
 | 
						|
	if svc == nil {
 | 
						|
		svc = previous
 | 
						|
	}
 | 
						|
	// previous == nil && current == nil is unexpected, we should return false directly.
 | 
						|
	if svc == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
 | 
						|
 | 
						|
	sct.lock.Lock()
 | 
						|
	defer sct.lock.Unlock()
 | 
						|
 | 
						|
	change, exists := sct.items[namespacedName]
 | 
						|
	if !exists {
 | 
						|
		change = &serviceChange{}
 | 
						|
		change.previous = sct.serviceToServiceMap(previous)
 | 
						|
		sct.items[namespacedName] = change
 | 
						|
	}
 | 
						|
	change.current = sct.serviceToServiceMap(current)
 | 
						|
	// if change.previous equal to change.current, it means no change
 | 
						|
	if reflect.DeepEqual(change.previous, change.current) {
 | 
						|
		delete(sct.items, namespacedName)
 | 
						|
	}
 | 
						|
	return len(sct.items) > 0
 | 
						|
}
 | 
						|
 | 
						|
// UpdateServiceMapResult is the updated results after applying service changes.
 | 
						|
type UpdateServiceMapResult struct {
 | 
						|
	// HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node.
 | 
						|
	// The value(uint16) of HCServices map is the service health check node port.
 | 
						|
	HCServiceNodePorts map[types.NamespacedName]uint16
 | 
						|
	// UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports.
 | 
						|
	// Callers can use this to abort timeout-waits or clear connection-tracking information.
 | 
						|
	UDPStaleClusterIP sets.String
 | 
						|
}
 | 
						|
 | 
						|
// UpdateServiceMap updates ServiceMap based on the given changes.
 | 
						|
func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
 | 
						|
	result.UDPStaleClusterIP = sets.NewString()
 | 
						|
	serviceMap.apply(changes, result.UDPStaleClusterIP)
 | 
						|
 | 
						|
	// TODO: If this will appear to be computationally expensive, consider
 | 
						|
	// computing this incrementally similarly to serviceMap.
 | 
						|
	result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
 | 
						|
	for svcPortName, info := range serviceMap {
 | 
						|
		if info.GetHealthCheckNodePort() != 0 {
 | 
						|
			result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort())
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return result
 | 
						|
}
 | 
						|
 | 
						|
// ServiceMap maps a service to its ServicePort.
 | 
						|
type ServiceMap map[ServicePortName]ServicePort
 | 
						|
 | 
						|
// serviceToServiceMap translates a single Service object to a ServiceMap.
 | 
						|
//
 | 
						|
// NOTE: service object should NOT be modified.
 | 
						|
func (sct *ServiceChangeTracker) serviceToServiceMap(service *api.Service) ServiceMap {
 | 
						|
	if service == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
						|
	if utilproxy.ShouldSkipService(svcName, service) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if len(service.Spec.ClusterIP) != 0 {
 | 
						|
		// Filter out the incorrect IP version case.
 | 
						|
		// If ClusterIP on service has incorrect IP version, service itself will be ignored.
 | 
						|
		if sct.isIPv6Mode != nil && utilnet.IsIPv6String(service.Spec.ClusterIP) != *sct.isIPv6Mode {
 | 
						|
			utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "clusterIP", service.Spec.ClusterIP, service.Namespace, service.Name, service.UID)
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	serviceMap := make(ServiceMap)
 | 
						|
	for i := range service.Spec.Ports {
 | 
						|
		servicePort := &service.Spec.Ports[i]
 | 
						|
		svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
 | 
						|
		baseSvcInfo := sct.newBaseServiceInfo(servicePort, service)
 | 
						|
		if sct.makeServiceInfo != nil {
 | 
						|
			serviceMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo)
 | 
						|
		} else {
 | 
						|
			serviceMap[svcPortName] = baseSvcInfo
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return serviceMap
 | 
						|
}
 | 
						|
 | 
						|
// apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the
 | 
						|
// udp protocol service cluster ip when service is deleted from the ServiceMap.
 | 
						|
func (serviceMap *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
 | 
						|
	changes.lock.Lock()
 | 
						|
	defer changes.lock.Unlock()
 | 
						|
	for _, change := range changes.items {
 | 
						|
		serviceMap.merge(change.current)
 | 
						|
		// filter out the Update event of current changes from previous changes before calling unmerge() so that can
 | 
						|
		// skip deleting the Update events.
 | 
						|
		change.previous.filter(change.current)
 | 
						|
		serviceMap.unmerge(change.previous, UDPStaleClusterIP)
 | 
						|
	}
 | 
						|
	// clear changes after applying them to ServiceMap.
 | 
						|
	changes.items = make(map[types.NamespacedName]*serviceChange)
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// merge adds other ServiceMap's elements to current ServiceMap.
 | 
						|
// If collision, other ALWAYS win. Otherwise add the other to current.
 | 
						|
// In other words, if some elements in current collisions with other, update the current by other.
 | 
						|
// It returns a string type set which stores all the newly merged services' identifier, ServicePortName.String(), to help users
 | 
						|
// tell if a service is deleted or updated.
 | 
						|
// The returned value is one of the arguments of ServiceMap.unmerge().
 | 
						|
// ServiceMap A Merge ServiceMap B will do following 2 things:
 | 
						|
//   * update ServiceMap A.
 | 
						|
//   * produce a string set which stores all other ServiceMap's ServicePortName.String().
 | 
						|
// For example,
 | 
						|
//   - A{}
 | 
						|
//   - B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
 | 
						|
//     - A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
 | 
						|
//     - produce string set {"ns/cluster-ip:http"}
 | 
						|
//   - A{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 345, "UDP"}}
 | 
						|
//   - B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
 | 
						|
//     - A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
 | 
						|
//     - produce string set {"ns/cluster-ip:http"}
 | 
						|
func (sm *ServiceMap) merge(other ServiceMap) sets.String {
 | 
						|
	// existingPorts is going to store all identifiers of all services in `other` ServiceMap.
 | 
						|
	existingPorts := sets.NewString()
 | 
						|
	for svcPortName, info := range other {
 | 
						|
		// Take ServicePortName.String() as the newly merged service's identifier and put it into existingPorts.
 | 
						|
		existingPorts.Insert(svcPortName.String())
 | 
						|
		_, exists := (*sm)[svcPortName]
 | 
						|
		if !exists {
 | 
						|
			glog.V(1).Infof("Adding new service port %q at %s", svcPortName, info.String())
 | 
						|
		} else {
 | 
						|
			glog.V(1).Infof("Updating existing service port %q at %s", svcPortName, info.String())
 | 
						|
		}
 | 
						|
		(*sm)[svcPortName] = info
 | 
						|
	}
 | 
						|
	return existingPorts
 | 
						|
}
 | 
						|
 | 
						|
// filter filters out elements from ServiceMap base on given ports string sets.
 | 
						|
func (sm *ServiceMap) filter(other ServiceMap) {
 | 
						|
	for svcPortName := range *sm {
 | 
						|
		// skip the delete for Update event.
 | 
						|
		if _, ok := other[svcPortName]; ok {
 | 
						|
			delete(*sm, svcPortName)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// unmerge deletes all other ServiceMap's elements from current ServiceMap.  We pass in the UDPStaleClusterIP strings sets
 | 
						|
// for storing the stale udp service cluster IPs. We will clear stale udp connection base on UDPStaleClusterIP later
 | 
						|
func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
 | 
						|
	for svcPortName := range other {
 | 
						|
		info, exists := (*sm)[svcPortName]
 | 
						|
		if exists {
 | 
						|
			glog.V(1).Infof("Removing service port %q", svcPortName)
 | 
						|
			if info.GetProtocol() == api.ProtocolUDP {
 | 
						|
				UDPStaleClusterIP.Insert(info.ClusterIPString())
 | 
						|
			}
 | 
						|
			delete(*sm, svcPortName)
 | 
						|
		} else {
 | 
						|
			glog.Errorf("Service port %q doesn't exists", svcPortName)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |